kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] 01/07: Add functionality to serialize a RowBlock into columnar format
Date Tue, 31 Mar 2020 05:24:46 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1d28104cac0866c83b3022e3ddf3b7237962c1da
Author: Todd Lipcon <todd@apache.org>
AuthorDate: Wed Mar 25 11:38:56 2020 -0700

    Add functionality to serialize a RowBlock into columnar format
    
    This adds the core functionality to take a RowBlock and convert it into
    a set of buffers for each column: the cell data, the null bitmap, and
    the indirect (string) data.
    
    This also updates wire_protocol-test to add coverage for nulls and
    unselected rows, and adds a comparison benchmark. The columnar code path
    is 4-5x faster in the best case, and in the worst case only 30% slower
    that the existing row-wise code path.
    
    Some follow-up commits will add further optimizations.
    
    Change-Id: I287a8aa6736f19816b0edbe16409c01f35c0319e
    Reviewed-on: http://gerrit.cloudera.org:8080/15560
    Reviewed-by: Andrew Wong <awong@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/CMakeLists.txt            |   2 +-
 src/kudu/common/columnar_serialization.cc | 134 ++++++++-
 src/kudu/common/columnar_serialization.h  |  38 ++-
 src/kudu/common/wire_protocol-test.cc     | 442 ++++++++++++++++++++++--------
 src/kudu/util/faststring.h                |  10 +
 5 files changed, 510 insertions(+), 116 deletions(-)

diff --git a/src/kudu/common/CMakeLists.txt b/src/kudu/common/CMakeLists.txt
index 0a1814e..4ffe9c4 100644
--- a/src/kudu/common/CMakeLists.txt
+++ b/src/kudu/common/CMakeLists.txt
@@ -99,4 +99,4 @@ ADD_KUDU_TEST(scan_spec-test)
 ADD_KUDU_TEST(schema-test)
 ADD_KUDU_TEST(table_util-test)
 ADD_KUDU_TEST(types-test)
-ADD_KUDU_TEST(wire_protocol-test NUM_SHARDS 4)
+ADD_KUDU_TEST(wire_protocol-test NUM_SHARDS 10)
diff --git a/src/kudu/common/columnar_serialization.cc b/src/kudu/common/columnar_serialization.cc
index 69e8da9..5e647d6 100644
--- a/src/kudu/common/columnar_serialization.cc
+++ b/src/kudu/common/columnar_serialization.cc
@@ -19,7 +19,6 @@
 
 #include <immintrin.h>
 
-#include <cstdint>
 #include <cstring>
 #include <ostream>
 #include <string>
@@ -27,11 +26,17 @@
 
 #include <glog/logging.h>
 
+#include "kudu/common/columnblock.h"
+#include "kudu/common/common.pb.h"
+#include "kudu/common/rowblock.h"
+#include "kudu/common/schema.h"
+#include "kudu/common/types.h"
 #include "kudu/common/zp7.h"
 #include "kudu/gutil/cpu.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/alignment.h"
 #include "kudu/util/bitmap.h"
+#include "kudu/util/slice.h"
 
 using std::vector;
 
@@ -360,6 +365,133 @@ void CopySelectedRows(const vector<uint16_t>& sel_rows,
   }
 }
 
+namespace {
+// For each of the Slices in 'cells_buf', copy the pointed-to data into 'indirect' and
+// modify the Slice so that its 'pointer' field is not an actual memory pointer, but
+// rather an offset within the indirect data buffer.
+void RelocateSlicesToIndirect(uint8_t* __restrict__ cells_buf, int n_rows,
+                              faststring* indirect) {
+  Slice* cell_slices = reinterpret_cast<Slice*>(cells_buf);
+  size_t total_size = 0;
+  for (int i = 0; i < n_rows; i++) {
+    total_size += cell_slices[i].size();
+  }
+
+  int old_size = indirect->size();
+  indirect->resize_with_extra_capacity(old_size + total_size);
+
+  uint8_t* dst_base = indirect->data();
+  uint8_t* dst = dst_base + old_size;
+
+  for (int i = 0; i < n_rows; i++) {
+    Slice* s = &cell_slices[i];
+    if (!s->empty()) {
+      memcpy(dst, s->data(), s->size());
+    }
+    *s = Slice(reinterpret_cast<const uint8_t*>(dst - dst_base), s->size());
+    dst += s->size();
+  }
+}
+
+// Specialized division for the known type sizes. Despite having some branching here,
+// this is faster than a 'div' instruction which has a 20+ cycle latency.
+size_t div_type_size(size_t s, size_t divisor) {
+  switch (divisor) {
+    case 1: return s;
+    case 2: return s/2;
+    case 4: return s/4;
+    case 8: return s/8;
+    case 16: return s/16;
+    default: return s/divisor;
+  }
+}
+
+
+// Copy the selected cells (and non-null-bitmap bits) from 'cblock' into 'dst' according
to
+// the given 'sel_rows'.
+void CopySelectedCellsFromColumn(const ColumnBlock& cblock,
+                                 const SelectedRows& sel_rows,
+                                 ColumnarSerializedBatch::Column* dst) {
+  size_t type_size = cblock.type_info()->size();
+  int n_sel = sel_rows.num_selected();
+
+  // Number of initial rows in the dst values and null_bitmap.
+  DCHECK_EQ(dst->data.size() % type_size, 0);
+  size_t initial_rows = div_type_size(dst->data.size(), type_size);
+  size_t new_num_rows = initial_rows + n_sel;
+
+  dst->data.resize_with_extra_capacity(type_size * new_num_rows);
+  uint8_t* dst_buf = dst->data.data() + type_size * initial_rows;
+  const uint8_t* src_buf = cblock.cell_ptr(0);
+
+  if (sel_rows.all_selected()) {
+    memcpy(dst_buf, src_buf, type_size * n_sel);
+  } else {
+    CopySelectedRows(sel_rows.indexes(), type_size, src_buf, dst_buf);
+  }
+
+  if (cblock.is_nullable()) {
+    DCHECK_EQ(dst->non_null_bitmap->size(), BitmapSize(initial_rows));
+    dst->non_null_bitmap->resize_with_extra_capacity(BitmapSize(new_num_rows));
+    CopyNonNullBitmap(cblock.non_null_bitmap(),
+                      sel_rows.bitmap(),
+                      initial_rows, cblock.nrows(),
+                      dst->non_null_bitmap->data());
+    ZeroNullValues(type_size, initial_rows, n_sel, dst->data.data(), dst->non_null_bitmap->data());
+  }
+
+  if (cblock.type_info()->physical_type() == BINARY) {
+    RelocateSlicesToIndirect(dst_buf, n_sel, boost::get_pointer(dst->indirect_data));
+  }
+}
+} // anonymous namespace
 } // namespace internal
 
+int SerializeRowBlockColumnar(
+    const RowBlock& block,
+    const Schema* projection_schema,
+    ColumnarSerializedBatch* out) {
+  DCHECK_GT(block.nrows(), 0);
+  const Schema* tablet_schema = block.schema();
+
+  if (projection_schema == nullptr) {
+    projection_schema = tablet_schema;
+  }
+
+  // Initialize buffers for the columns.
+  // TODO(todd) don't pre-size these to 1MB per column -- quite
+  // expensive if there are a lot of columns!
+  if (out->columns.size() != projection_schema->num_columns()) {
+    CHECK_EQ(out->columns.size(), 0);
+    out->columns.reserve(projection_schema->num_columns());
+    for (const auto& col : projection_schema->columns()) {
+      out->columns.emplace_back();
+      out->columns.back().data.reserve(1024 * 1024);
+      if (col.type_info()->physical_type() == BINARY) {
+        out->columns.back().indirect_data.emplace();
+      }
+      if (col.is_nullable()) {
+        out->columns.back().non_null_bitmap.emplace();
+      }
+    }
+  }
+
+  SelectedRows sel = block.selection_vector()->GetSelectedRows();
+  int col_idx = 0;
+  for (const auto& col : projection_schema->columns()) {
+    int t_schema_idx = tablet_schema->find_column(col.name());
+    CHECK_NE(t_schema_idx, -1);
+    const ColumnBlock& column_block = block.column_block(t_schema_idx);
+
+    internal::CopySelectedCellsFromColumn(
+        column_block,
+        sel,
+        &out->columns[col_idx]);
+    col_idx++;
+  }
+
+  return sel.num_selected();
+}
+
+
 } // namespace kudu
diff --git a/src/kudu/common/columnar_serialization.h b/src/kudu/common/columnar_serialization.h
index 04025b7..d7ad494 100644
--- a/src/kudu/common/columnar_serialization.h
+++ b/src/kudu/common/columnar_serialization.h
@@ -19,15 +19,48 @@
 #include <cstdint>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
+
+#include "kudu/util/faststring.h"
+
 namespace kudu {
 
+class RowBlock;
+class Schema;
+
+// A pending batch of serialized rows, suitable for easy conversion
+// into the protobuf representation and a set of sidecars.
+struct ColumnarSerializedBatch {
+  struct Column {
+    // Underlying column data.
+    faststring data;
+
+    // Data for varlen columns (BINARY)
+    boost::optional<faststring> indirect_data;
+
+    // Each bit is set when a value is non-null
+    boost::optional<faststring> non_null_bitmap;
+  };
+  std::vector<Column> columns;
+};
+
+// Serialize the data in 'block' into the columnar batch 'out', appending to
+// any data already serialized to the same batch.
+//
+// Returns the number of selected rows serialized.
+int SerializeRowBlockColumnar(
+    const RowBlock& block,
+    const Schema* projection_schema,
+    ColumnarSerializedBatch* out);
+
+
 ////////////////////////////////////////////////////////////
 // Expose these internal functions for unit testing.
 // Do not call them outside of tests!
 // See .cc file for docs.
 ////////////////////////////////////////////////////////////
 namespace internal {
-void ZeroNullValues(int type_size,
+void ZeroNullValues(int sizeof_type,
                     int dst_idx,
                     int n_rows,
                     uint8_t* dst_values_buf,
@@ -40,7 +73,7 @@ void CopyNonNullBitmap(const uint8_t* non_null_bitmap,
                        uint8_t* dst_non_null_bitmap);
 
 void CopySelectedRows(const std::vector<uint16_t>& sel_rows,
-                      int type_size,
+                      int sizeof_type,
                       const uint8_t* __restrict__ src_buf,
                       uint8_t* __restrict__ dst_buf);
 
@@ -52,6 +85,7 @@ enum class PextMethod {
 #endif
   kSimple
 };
+
 extern PextMethod g_pext_method;
 
 std::vector<PextMethod> GetAvailablePextMethods();
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index 4c9ac0d..ae18f1e 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
+#include <list>
 #include <numeric>
 #include <ostream>
 #include <string>
@@ -30,12 +31,14 @@
 #include <gtest/gtest.h>
 
 #include "kudu/common/column_predicate.h"
+#include "kudu/common/columnar_serialization.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/util/bitmap.h"
@@ -43,7 +46,9 @@
 #include "kudu/util/faststring.h"
 #include "kudu/util/hash.pb.h"
 #include "kudu/util/hexdump.h"
+#include "kudu/util/int128.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/random.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"  // IWYU pragma: keep
@@ -57,121 +62,58 @@ using strings::Substitute;
 
 namespace kudu {
 
-class WireProtocolTest : public KuduTest,
-                         // Used for benchmark, int corresponds to the number of columns,
-                         // double corresponds to the selection rate.
-                         public testing::WithParamInterface<tuple<int, double>>
{
+class WireProtocolTest : public KuduTest {
  public:
   WireProtocolTest()
-      : schema_({ ColumnSchema("col1", STRING),
-              ColumnSchema("col2", STRING),
-              ColumnSchema("col3", UINT32, true /* nullable */) },
+      : schema_({ ColumnSchema("string", STRING),
+                  ColumnSchema("nullable_string", STRING, /* is_nullable=*/true),
+                  ColumnSchema("int", INT32),
+                  ColumnSchema("nullable_int", INT32, /* is_nullable=*/true),
+                  ColumnSchema("int64", INT64) },
         1),
         test_data_arena_(4096) {
   }
 
-  void FillRowBlockWithTestRows(RowBlock* block) {
-    test_data_arena_.Reset();
+  static void FillRowBlockWithTestRows(RowBlock* block) {
+    Random rng(SeedRandom());
+
     block->selection_vector()->SetAllTrue();
 
     for (int i = 0; i < block->nrows(); i++) {
+      if (rng.OneIn(10)) {
+        block->selection_vector()->SetRowUnselected(i);
+        continue;
+      }
+
       RowBlockRow row = block->row(i);
 
       // We make new copies of these strings into the Arena for each row so that
       // the workload is more realistic. If we just re-use the same Slice object
       // for each row, the memory accesses fit entirely into a smaller number of
       // cache lines and we may micro-optimize for the wrong thing.
-      Slice col1, col2;
-      CHECK(test_data_arena_.RelocateSlice("hello world col1", &col1));
-      CHECK(test_data_arena_.RelocateSlice("hello world col2", &col2));
-      *reinterpret_cast<Slice*>(row.mutable_cell_ptr(0)) = col1;
-      *reinterpret_cast<Slice*>(row.mutable_cell_ptr(1)) = col2;
-      *reinterpret_cast<uint32_t*>(row.mutable_cell_ptr(2)) = i;
-      row.cell(2).set_null(false);
-    }
-  }
-
-  void ResetBenchmarkSchema(int num_columns) {
-    vector<ColumnSchema> column_schemas;
-    column_schemas.reserve(num_columns);
-    for (int i = 0; i < num_columns; i++) {
-      column_schemas.emplace_back(Substitute("col$0", i), i % 2 ? STRING : INT32);
-    }
-    benchmark_schema_.Reset(column_schemas, 1);
-  }
-
-  void FillRowBlockForBenchmark(RowBlock* block) {
-    test_data_arena_.Reset();
-    for (int i = 0; i < block->nrows(); i++) {
-      RowBlockRow row = block->row(i);
-      for (int j = 0; j < benchmark_schema_.num_columns(); j++) {
-        const ColumnSchema& column_schema = benchmark_schema_.column(j);
-        DataType type = column_schema.type_info()->type();
-        if (type == STRING) {
-          Slice col;
-          CHECK(test_data_arena_.RelocateSlice(Substitute("hello world $0",
-                                               column_schema.name()), &col));
-          memcpy(row.mutable_cell_ptr(j), &col, sizeof(Slice));
-        } else if (type == INT32) {
-          memcpy(row.mutable_cell_ptr(j), &i, sizeof(int32_t));
-        } else {
-          LOG(FATAL) << "Unexpected type.";
-        }
+      CHECK(block->arena()->RelocateSlice(
+          "hello world col0",
+          reinterpret_cast<Slice*>(row.mutable_cell_ptr(0))));
+
+      if (rng.OneIn(3)) {
+        row.cell(1).set_null(true);
+      } else {
+        row.cell(1).set_null(false);
+        CHECK(block->arena()->RelocateSlice(
+            "hello world col1",
+            reinterpret_cast<Slice*>(row.mutable_cell_ptr(1))));
       }
-    }
-  }
-
-  void SelectRandomRowsWithRate(RowBlock* block, double rate) {
-    CHECK_LE(rate, 1.0);
-    CHECK_GE(rate, 0.0);
-    int select_count = block->nrows() * rate;
-    SelectionVector* select_vector = block->selection_vector();
-    if (rate == 1.0) {
-      select_vector->SetAllTrue();
-    } else if (rate == 0.0) {
-      select_vector->SetAllFalse();
-    } else {
-      vector<int> indexes(block->nrows());
-      std::iota(indexes.begin(), indexes.end(), 0);
-      std::random_shuffle(indexes.begin(), indexes.end());
-      indexes.resize(select_count);
-      select_vector->SetAllFalse();
-      for (auto index : indexes) {
-        select_vector->SetRowSelected(index);
-      }
-    }
-    CHECK_EQ(select_vector->CountSelected(), select_count);
-  }
 
-  // Use column_count to control the schema scale.
-  // Use select_rate to control the number of selected rows.
-  void RunBenchmark(int column_count, double select_rate) {
-    ResetBenchmarkSchema(column_count);
-    Arena arena(1024);
-    RowBlock block(&benchmark_schema_, 1000, &arena);
-    // Regardless of the config, use a constant number of cells for the test by
-    // looping the conversion an appropriate number of times.
-    const int64_t kNumCellsToConvert = AllowSlowTests() ? 100000000 : 1000000;
-    const int kNumTrials = kNumCellsToConvert / select_rate / column_count / block.nrows();
-    FillRowBlockForBenchmark(&block);
-    SelectRandomRowsWithRate(&block, select_rate);
+      *reinterpret_cast<int32_t*>(row.mutable_cell_ptr(2)) = i;
+      *reinterpret_cast<int32_t*>(row.mutable_cell_ptr(3)) = i;
+      row.cell(3).set_null(rng.OneIn(7));
 
-    faststring direct, indirect;
-    int64_t cycle_start = CycleClock::Now();
-    for (int i = 0; i < kNumTrials; ++i) {
-      direct.clear();
-      indirect.clear();
-      SerializeRowBlock(block, nullptr, &direct, &indirect);
+      *reinterpret_cast<int64_t*>(row.mutable_cell_ptr(4)) = i;
     }
-    int64_t cycle_end = CycleClock::Now();
-    LOG(INFO) << Substitute(
-        "Converting to PB with column count $0 and row select rate $1: $2 cycles/cell",
-        column_count, select_rate,
-        static_cast<double>(cycle_end - cycle_start) / kNumCellsToConvert);
   }
+
  protected:
   Schema schema_;
-  Schema benchmark_schema_;
   Arena test_data_arena_;
 };
 
@@ -221,25 +163,38 @@ TEST_F(WireProtocolTest, TestSchemaRoundTrip) {
   google::protobuf::RepeatedPtrField<ColumnSchemaPB> pbs;
 
   ASSERT_OK(SchemaToColumnPBs(schema_, &pbs));
-  ASSERT_EQ(3, pbs.size());
+  ASSERT_EQ(5, pbs.size());
 
   // Column 0.
   EXPECT_TRUE(pbs.Get(0).is_key());
-  EXPECT_EQ("col1", pbs.Get(0).name());
+  EXPECT_EQ("string", pbs.Get(0).name());
   EXPECT_EQ(STRING, pbs.Get(0).type());
   EXPECT_FALSE(pbs.Get(0).is_nullable());
 
   // Column 1.
   EXPECT_FALSE(pbs.Get(1).is_key());
-  EXPECT_EQ("col2", pbs.Get(1).name());
+  EXPECT_EQ("nullable_string", pbs.Get(1).name());
   EXPECT_EQ(STRING, pbs.Get(1).type());
-  EXPECT_FALSE(pbs.Get(1).is_nullable());
+  EXPECT_TRUE(pbs.Get(1).is_nullable());
 
   // Column 2.
   EXPECT_FALSE(pbs.Get(2).is_key());
-  EXPECT_EQ("col3", pbs.Get(2).name());
-  EXPECT_EQ(UINT32, pbs.Get(2).type());
-  EXPECT_TRUE(pbs.Get(2).is_nullable());
+  EXPECT_EQ("int", pbs.Get(2).name());
+  EXPECT_EQ(INT32, pbs.Get(2).type());
+  EXPECT_FALSE(pbs.Get(2).is_nullable());
+
+
+  // Column 3.
+  EXPECT_FALSE(pbs.Get(3).is_key());
+  EXPECT_EQ("nullable_int", pbs.Get(3).name());
+  EXPECT_EQ(INT32, pbs.Get(3).type());
+  EXPECT_TRUE(pbs.Get(3).is_nullable());
+
+  // Column 4.
+  EXPECT_FALSE(pbs.Get(4).is_key());
+  EXPECT_EQ("int64", pbs.Get(4).name());
+  EXPECT_EQ(INT64, pbs.Get(4).type());
+  EXPECT_FALSE(pbs.Get(4).is_nullable());
 
   // Convert back to a Schema object and verify they're identical.
   Schema schema2;
@@ -304,11 +259,10 @@ TEST_F(WireProtocolTest, TestBadSchema_DuplicateColumnName) {
   ASSERT_EQ("Invalid argument: Duplicate column name: c0", s.ToString());
 }
 
-// Create a block of rows in columnar layout and ensure that it can be
-// converted to and from protobuf.
-TEST_F(WireProtocolTest, TestColumnarRowBlockToPB) {
+// Create a block of rows and ensure that it can be converted to and from protobuf.
+TEST_F(WireProtocolTest, TestRowBlockToRowwisePB) {
   Arena arena(1024);
-  RowBlock block(&schema_, 10, &arena);
+  RowBlock block(&schema_, 30, &arena);
   FillRowBlockWithTestRows(&block);
 
   // Convert to PB.
@@ -326,14 +280,82 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPB) {
   Slice direct_sidecar = direct;
   ASSERT_OK(ExtractRowsFromRowBlockPB(schema_, pb, indirect,
                                       &direct_sidecar, &row_ptrs));
-  ASSERT_EQ(block.nrows(), row_ptrs.size());
+  ASSERT_EQ(block.selection_vector()->CountSelected(), row_ptrs.size());
+  int dst_row_idx = 0;
   for (int i = 0; i < block.nrows(); ++i) {
-    ConstContiguousRow row_roundtripped(&schema_, row_ptrs[i]);
+    if (!block.selection_vector()->IsRowSelected(i)) {
+      continue;
+    }
+    ConstContiguousRow row_roundtripped(&schema_, row_ptrs[dst_row_idx]);
     EXPECT_EQ(schema_.DebugRow(block.row(i)),
               schema_.DebugRow(row_roundtripped));
+    dst_row_idx++;
+  }
+}
+
+// Create blocks of rows and ensure that they can be converted to the columnar serialized
+// layout.
+TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
+  // Generate several blocks of random data.
+  static constexpr int kNumBlocks = 3;
+  Arena arena(1024);
+  std::list<RowBlock> blocks;
+  for (int i = 0; i < kNumBlocks; i++) {
+    blocks.emplace_back(&schema_, 30, &arena);
+    FillRowBlockWithTestRows(&blocks.back());
+  }
+
+  // Convert all of the RowBlocks to a single serialized (concatenated) columnar format.
+  ColumnarSerializedBatch batch;
+  for (const auto& block : blocks) {
+    SerializeRowBlockColumnar(block, nullptr, &batch);
+  }
+
+  // Verify that the resulting serialized data matches the concatenated original data blocks.
+  ASSERT_EQ(5, batch.columns.size());
+  int dst_row_idx = 0;
+  for (const auto& block : blocks) {
+    for (int src_row_idx = 0; src_row_idx < block.nrows(); src_row_idx++) {
+      if (!block.selection_vector()->IsRowSelected(src_row_idx)) {
+        continue;
+      }
+      SCOPED_TRACE(src_row_idx);
+      SCOPED_TRACE(dst_row_idx);
+      const auto& row = block.row(src_row_idx);
+      for (int c = 0; c < schema_.num_columns(); c++) {
+        SCOPED_TRACE(c);
+        const auto& col = schema_.column(c);
+        const auto& serialized_col = batch.columns[c];
+        if (col.is_nullable()) {
+          bool expect_null = row.is_null(c);;
+          EXPECT_EQ(!BitmapTest(serialized_col.non_null_bitmap->data(), dst_row_idx),
+                    expect_null);
+          if (expect_null) {
+            continue;
+          }
+        }
+        int type_size = col.type_info()->size();
+        Slice serialized_val(serialized_col.data.data() + type_size * dst_row_idx,
+                             type_size);
+        Slice orig_val(row.cell_ptr(c), type_size);
+
+        if (col.type_info()->physical_type() == BINARY) {
+          orig_val = *reinterpret_cast<const Slice*>(orig_val.data());
+          serialized_val = *reinterpret_cast<const Slice*>(serialized_val.data());
+
+          uintptr_t indirect_offset = reinterpret_cast<uintptr_t>(serialized_val.data());
+          serialized_val = Slice(serialized_col.indirect_data->data() + indirect_offset,
+                                 serialized_val.size());
+        }
+
+        EXPECT_EQ(orig_val, serialized_val);
+      }
+      dst_row_idx++;
+    }
   }
 }
 
+
 // Create a block of rows in columnar layout and ensure that it can be
 // converted to and from protobuf.
 TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) {
@@ -427,15 +449,211 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) {
   }
 }
 
-TEST_P(WireProtocolTest, TestColumnarRowBlockToPBBenchmark) {
-  int column_count = std::get<0>(GetParam());
+struct RowwiseConverter {
+  static void Run(const RowBlock& block) {
+    faststring direct;
+    faststring indirect;
+    SerializeRowBlock(block, nullptr, &direct, &indirect);
+  }
+
+  static constexpr const char* kName = "row-wise";
+};
+
+
+struct ColumnarConverter {
+  static void Run(const RowBlock& block) {
+    ColumnarSerializedBatch batch;
+    SerializeRowBlockColumnar(block, nullptr, &batch);
+  }
+
+  static constexpr const char* kName = "columnar";
+};
+
+struct BenchmarkColumnsSpec {
+  struct Col {
+    DataType type;
+    double null_fraction; // negative for non-null
+  };
+  vector<Col> columns;
+  string name;
+};
+
+class WireProtocolBenchmark :
+      public WireProtocolTest,
+      public testing::WithParamInterface<tuple<BenchmarkColumnsSpec, double>>
{
+ public:
+
+  void ResetBenchmarkSchema(const BenchmarkColumnsSpec& spec) {
+    vector<ColumnSchema> column_schemas;
+    int i = 0;
+    for (const auto& c : spec.columns) {
+      column_schemas.emplace_back(Substitute("col$0", i++),
+                                  c.type,
+                                  /*nullable=*/c.null_fraction >= 0);
+    }
+    CHECK_OK(benchmark_schema_.Reset(std::move(column_schemas), 0));
+  }
+
+  void FillRowBlockForBenchmark(const BenchmarkColumnsSpec& spec,
+                                RowBlock* block) {
+    Random rng(SeedRandom());
+
+    test_data_arena_.Reset();
+    for (int i = 0; i < block->nrows(); i++) {
+      RowBlockRow row = block->row(i);
+      for (int j = 0; j < benchmark_schema_.num_columns(); j++) {
+        const ColumnSchema& column_schema = benchmark_schema_.column(j);
+        DataType type = spec.columns[j].type;
+        bool is_null = rng.NextDoubleFraction() <= spec.columns[j].null_fraction;
+        if (column_schema.is_nullable()) {
+          row.cell(j).set_null(is_null);
+        }
+        if (!is_null) {
+          switch (type) {
+            case STRING: {
+              Slice col;
+              CHECK(test_data_arena_.RelocateSlice(Substitute("hello world $0",
+                                                              column_schema.name()), &col));
+              memcpy(row.mutable_cell_ptr(j), &col, sizeof(Slice));
+              break;
+            }
+            case INT128:
+              UnalignedStore<int128_t>(row.mutable_cell_ptr(j), i);
+              break;
+            case INT64:
+              UnalignedStore<int64_t>(row.mutable_cell_ptr(j), i);
+              break;
+            case INT32:
+              UnalignedStore<int32_t>(row.mutable_cell_ptr(j), i);
+              break;
+            case INT16:
+              UnalignedStore<int16_t>(row.mutable_cell_ptr(j), i);
+              break;
+            case INT8:
+              UnalignedStore<int8_t>(row.mutable_cell_ptr(j), i);
+              break;
+            default:
+              LOG(FATAL) << "Unexpected type: " << type;
+          }
+        }
+      }
+    }
+  }
+
+  static void SelectRandomRowsWithRate(RowBlock* block, double rate) {
+    CHECK_LE(rate, 1.0);
+    CHECK_GE(rate, 0.0);
+    auto select_count = block->nrows() * rate;
+    SelectionVector* select_vector = block->selection_vector();
+    if (rate == 1.0) {
+      select_vector->SetAllTrue();
+    } else if (rate == 0.0) {
+      select_vector->SetAllFalse();
+    } else {
+      vector<int> indexes(block->nrows());
+      std::iota(indexes.begin(), indexes.end(), 0);
+      std::random_shuffle(indexes.begin(), indexes.end());
+      indexes.resize(select_count);
+      select_vector->SetAllFalse();
+      for (auto index : indexes) {
+        select_vector->SetRowSelected(index);
+      }
+    }
+    CHECK_EQ(select_vector->CountSelected(), select_count);
+  }
+
+
+  // Use column_count to control the schema scale.
+  // Use select_rate to control the number of selected rows.
+  template<class Converter>
+  double RunBenchmark(const BenchmarkColumnsSpec& spec,
+                    double select_rate) {
+    ResetBenchmarkSchema(spec);
+    Arena arena(1024);
+    RowBlock block(&benchmark_schema_, 1000, &arena);
+    // Regardless of the config, use a constant number of selected cells for the test by
+    // looping the conversion an appropriate number of times.
+    const int64_t kNumCellsToConvert = AllowSlowTests() ? 100000000 : 1000000;
+    const int64_t kCellsPerBlock = block.nrows() * spec.columns.size();
+    const double kSelectedCellsPerBlock = kCellsPerBlock * select_rate;
+    const int kNumTrials = static_cast<int>(kNumCellsToConvert /  kSelectedCellsPerBlock);
+    FillRowBlockForBenchmark(spec, &block);
+    SelectRandomRowsWithRate(&block, select_rate);
+
+    int64_t cycle_start = CycleClock::Now();
+    for (int i = 0; i < kNumTrials; ++i) {
+      Converter::Run(block);
+    }
+    int64_t cycle_end = CycleClock::Now();
+    double cycles_per_cell = static_cast<double>(cycle_end - cycle_start) / kNumCellsToConvert;
+    LOG(INFO) << Substitute(
+        "Converting $0 to PB (method $3) row select rate $1: $2 cycles/cell",
+        spec.name, select_rate, cycles_per_cell,
+        Converter::kName);
+    return cycles_per_cell;
+  }
+
+ protected:
+  Schema benchmark_schema_;
+};
+
+TEST_P(WireProtocolBenchmark, TestRowBlockToPBBenchmark) {
+  const auto& spec = std::get<0>(GetParam());
   double select_rate = std::get<1>(GetParam());
-  RunBenchmark(column_count, select_rate);
+  double cycles_per_cell_rowwise = RunBenchmark<RowwiseConverter>(spec, select_rate);
+  double cycles_per_cell_columnar = RunBenchmark<ColumnarConverter>(spec, select_rate);
+  double ratio = cycles_per_cell_rowwise / cycles_per_cell_columnar;
+  LOG(INFO) << Substitute(
+      "Converting $0 to PB row select rate $1: columnar/rowwise throughput ratio: $2x",
+      spec.name, select_rate, ratio);
 }
 
-INSTANTIATE_TEST_CASE_P(ColumnarRowBlockToPBBenchmarkParams, WireProtocolTest,
-                        testing::Combine(testing::Values(3, 30, 300),
-                                         testing::Values(1.0, 0.8, 0.5, 0.2)));
+BenchmarkColumnsSpec UniformColumns(int n_cols, DataType type, double null_fraction) {
+  vector<BenchmarkColumnsSpec::Col> cols(n_cols);
+  for (int i = 0; i < n_cols; i++) {
+    cols[i] = {type, null_fraction};
+  }
+  string null_str;
+  if (null_fraction >= 0) {
+    null_str = Substitute("$0pct_null", static_cast<int>(null_fraction * 100));
+  } else {
+    null_str = "non_null";
+  }
+  return {cols, Substitute("$0_$1_$2",
+                           n_cols,
+                           GetTypeInfo(type)->name(),
+                           null_str) };
+}
+
+INSTANTIATE_TEST_CASE_P(
+    ColumnarRowBlockToPBBenchmarkParams, WireProtocolBenchmark,
+    testing::Combine(
+        testing::Values(
+            UniformColumns(10, INT64, -1),
+            UniformColumns(10, INT32, -1),
+            UniformColumns(10, STRING, -1),
+
+            UniformColumns(10, INT128, 0),
+            UniformColumns(10, INT64, 0),
+            UniformColumns(10, INT32, 0),
+            UniformColumns(10, INT16, 0),
+            UniformColumns(10, INT8, 0),
+            UniformColumns(10, STRING, 0),
+
+            UniformColumns(10, INT128, 0.1),
+            UniformColumns(10, INT64, 0.1),
+            UniformColumns(10, INT32, 0.1),
+            UniformColumns(10, INT16, 0.1),
+            UniformColumns(10, INT8, 0.1),
+            UniformColumns(10, STRING, 0.1)),
+        // Selection rates.
+        testing::Values(1.0, 0.8, 0.5, 0.2)),
+    [](const testing::TestParamInfo<WireProtocolBenchmark::ParamType>& info) {
+      return Substitute("$0_sel_$1pct",
+                        std::get<0>(info.param).name,
+                        static_cast<int>(std::get<1>(info.param)*100));
+    });
+
 
 // Test that trying to extract rows from an invalid block correctly returns
 // Corruption statuses.
diff --git a/src/kudu/util/faststring.h b/src/kudu/util/faststring.h
index 1357eab..c7215f7 100644
--- a/src/kudu/util/faststring.h
+++ b/src/kudu/util/faststring.h
@@ -106,6 +106,16 @@ class faststring {
     ASAN_UNPOISON_MEMORY_REGION(data_, len_);
   }
 
+  // Resize to 'newsize'. In contrast to 'resize()', if this requires allocating a new
+  // backing array, the new capacity is rounded up in the same manner as if data had been
+  // appended to the buffer.
+  void resize_with_extra_capacity(size_t newsize) {
+    if (newsize > capacity_) {
+      GrowToAtLeast(newsize);
+    }
+    resize(newsize);
+  }
+
   // Releases the underlying array; after this, the buffer is left empty.
   //
   // NOTE: the data pointer returned by release() always points to dynamically


Mime
View raw message