kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject [1/2] kudu git commit: Allow to pad UNIXTIME_MICROS slots in scan results
Date Thu, 27 Apr 2017 19:43:15 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 1edaee01f -> bcb08fcd9


Allow to pad UNIXTIME_MICROS slots in scan results

This changes the wire protocol to, upon request, pad
the slots that contain values for UNIXTIME_MICROS columns
by 8 bytes. This allows Impala to adopt the result of a
Kudu scan as a whole while still having space to transform
UNIXTIME_MICROS values, which occupy 8 bytes, to the Impala
representation of timestamp, which occupies 16, in place
and without copying memory.

This patch doesn't include any de-serialization logic
the reasoning being that Impala will have knowledge of
the data format in order to perform de-serialization
directly on the "raw" direct and indirect data.

This patch doesn't introduce any branching in the
serialization patch. It does move the memset() call
that was performed once per nullable column, per row,
to be performed on the whole block instead. While less
cache friendly, this is also executed less times. The
net gain is not significant, but it does not regress
in the normal case.

Results of running the wire_protocol-test benchmark
in slow mode:

Before (avg 3 runs): 3.076
After  (avg 3 runs): 3.000

The difference is around -2%, which might be in the noise
but demostrates no significant regression.

Change-Id: I99fc6d3be089d19ebe2e70c938f2405c381578b4
Reviewed-on: http://gerrit.cloudera.org:8080/6623
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/89f39c59
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/89f39c59
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/89f39c59

Branch: refs/heads/master
Commit: 89f39c59e09a5abcdf8a28dacb468ca11b048fde
Parents: 1edaee0
Author: David Alves <dralves@apache.org>
Authored: Wed Apr 12 22:30:46 2017 -0700
Committer: David Ribeiro Alves <davidralves@gmail.com>
Committed: Thu Apr 27 19:30:39 2017 +0000

----------------------------------------------------------------------
 src/kudu/common/wire_protocol-test.cc |  93 +++++++++++++++++++
 src/kudu/common/wire_protocol.cc      | 138 ++++++++++++++++++++---------
 src/kudu/common/wire_protocol.h       |  15 +++-
 3 files changed, 199 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/89f39c59/src/kudu/common/wire_protocol-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index 4ade664..83a966e 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -25,6 +25,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/hexdump.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
@@ -227,6 +228,98 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPB) {
   }
 }
 
+// Create a block of rows in columnar layout and ensure that it can be
+// converted to and from protobuf.
+TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) {
+  int kNumRows = 10;
+  Arena arena(1024, 1024 * 1024);
+  // Create a schema with multiple UNIXTIME_MICROS columns in different
+  // positions.
+  Schema tablet_schema({ ColumnSchema("key", UNIXTIME_MICROS),
+                         ColumnSchema("col1", STRING),
+                         ColumnSchema("col2", UNIXTIME_MICROS),
+                         ColumnSchema("col3", INT32, true /* nullable */),
+                         ColumnSchema("col4", UNIXTIME_MICROS, true /* nullable */)}, 1);
+  RowBlock block(tablet_schema, kNumRows, &arena);
+  block.selection_vector()->SetAllTrue();
+
+  for (int i = 0; i < block.nrows(); i++) {
+    RowBlockRow row = block.row(i);
+
+    *reinterpret_cast<int64_t*>(row.mutable_cell_ptr(0)) = i;
+    Slice col1;
+    // See: FillRowBlockWithTestRows() for the reason why we relocate these
+    // to 'test_data_arena_'.
+    CHECK(test_data_arena_.RelocateSlice("hello world col1", &col1));
+    *reinterpret_cast<Slice*>(row.mutable_cell_ptr(1)) = col1;
+    *reinterpret_cast<int64_t*>(row.mutable_cell_ptr(2)) = i;
+    *reinterpret_cast<int32_t*>(row.mutable_cell_ptr(3)) = i;
+    row.cell(3).set_null(false);
+    *reinterpret_cast<int64_t*>(row.mutable_cell_ptr(4)) = i;
+    row.cell(4).set_null(true);
+  }
+
+  // Have the projection schema have columns in a different order from the table schema.
+  Schema proj_schema({ ColumnSchema("col1", STRING),
+                       ColumnSchema("key",  UNIXTIME_MICROS),
+                       ColumnSchema("col2", UNIXTIME_MICROS),
+                       ColumnSchema("col4", UNIXTIME_MICROS, true /* nullable */),
+                       ColumnSchema("col3", INT32, true /* nullable */)}, 0);
+
+  // Convert to PB.
+  RowwiseRowBlockPB pb;
+  faststring direct, indirect;
+  SerializeRowBlock(block, &pb, &proj_schema, &direct, &indirect, true /*
pad timestamps */);
+  SCOPED_TRACE(SecureDebugString(pb));
+  SCOPED_TRACE("Row data: " + HexDump(direct));
+  SCOPED_TRACE("Indirect data: " + HexDump(indirect));
+
+  // Convert back to a row, ensure that the resulting row is the same
+  // as the one we put in. Can't reuse the decoding methods since we
+  // won't support decoding padded rows within Kudu.
+  vector<const uint8_t*> row_ptrs;
+  Slice direct_sidecar = direct;
+  Slice indirect_sidecar = indirect;
+  ASSERT_OK(RewriteRowBlockPointers(proj_schema, pb, indirect_sidecar, &direct_sidecar,
true));
+
+  // Row stride is the normal size for the schema + the number of UNIXTIME_MICROS columns
* 8,
+  // the size of the padding per column.
+  size_t row_stride = ContiguousRowHelper::row_size(proj_schema) + 3 * 8;
+  ASSERT_EQ(direct_sidecar.size(), row_stride * kNumRows);
+  const uint8_t* base_data;
+  for (int i = 0; i < kNumRows; i++) {
+    base_data = direct_sidecar.data() + i * row_stride;
+    // With padding, the null bitmap is at offset 68.
+    // See the calculations below to understand why.
+    const uint8_t* null_bitmap = base_data + 68;
+
+    // 'col1' comes at 0 bytes offset in the projection schema.
+    const Slice* col1 = reinterpret_cast<const Slice*>(base_data);
+    ASSERT_EQ(col1->compare(Slice("hello world col1")), 0) << "Unexpected val for
the "
+                                                           << i << "th row:"
+                                                           << col1->ToDebugString();
+    // 'key' comes at 16 bytes offset.
+    const int64_t key = *reinterpret_cast<const int64_t*>(base_data + 16);
+    EXPECT_EQ(key, i);
+
+    // 'col2' comes at 32 bytes offset: 16 bytes previous, 16 bytes 'key'
+    const int64_t col2 = *reinterpret_cast<const int64_t*>(base_data + 32);
+    EXPECT_EQ(col2, i);
+
+    // 'col4' is supposed to be null, but should also read 0 since we memsetted the
+    // memory to 0. It should come at 48 bytes offset:  32 bytes previous + 8 bytes 'col2'
+
+    // 8 bytes padding.
+    const int64_t col4 = *reinterpret_cast<const int64_t*>(base_data + 48);
+    EXPECT_EQ(col4, 0);
+    EXPECT_TRUE(BitmapTest(null_bitmap, 3));
+
+    // 'col3' comes at 64 bytes offset: 48 bytes previous, 8 bytes 'col4', 8 bytes padding
+    const int32_t col3 = *reinterpret_cast<const int32_t*>(base_data + 64);
+    EXPECT_EQ(col3, i);
+    EXPECT_FALSE(BitmapTest(null_bitmap, 4));
+  }
+}
+
 #ifdef NDEBUG
 TEST_F(WireProtocolTest, TestColumnarRowBlockToPBBenchmark) {
   Arena arena(1024, 1024 * 1024);

http://git-wip-us.apache.org/repos/asf/kudu/blob/89f39c59/src/kudu/common/wire_protocol.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index fd61231..62a20a5 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -474,18 +474,32 @@ Status ColumnPredicateFromPB(const Schema& schema,
 // with the extra verifications.
 ATTRIBUTE_NO_ADDRESS_SAFETY_ANALYSIS
 Status RewriteRowBlockPointers(const Schema& schema, const RowwiseRowBlockPB& rowblock_pb,
-                               const Slice& indirect_data_slice, Slice* row_data_slice)
{
-  // TODO: cheating here so we can rewrite the request as it arrived and
+                               const Slice& indirect_data_slice, Slice* row_data_slice,
+                               bool pad_unixtime_micros_to_16_bytes) {
+  // TODO(todd): cheating here so we can rewrite the request as it arrived and
   // change any indirect data pointers back to "real" pointers instead of
   // on-the-wire pointers. Maybe the RPC layer should give us a non-const
   // request? Maybe we should suck it up and copy the data when we mutate?
 
+  size_t total_padding = 0;
+  // If we're padding UNIXTIME_MICROS for Impala we need to calculate the total padding
+  // size to adjust the row_stride.
+  if (pad_unixtime_micros_to_16_bytes) {
+    for (int i = 0; i < schema.num_columns(); i++) {
+      if (schema.column(i).type_info()->type() == UNIXTIME_MICROS) {
+        total_padding += 8;
+      }
+    }
+  }
+
+  size_t row_stride = ContiguousRowHelper::row_size(schema) + total_padding;
+
   // We don't need a const-cast because we can just use Slice's lack of
   // const-safety.
   uint8_t* row_data = row_data_slice->mutable_data();
   const uint8_t* indir_data = indirect_data_slice.data();
-  size_t row_size = ContiguousRowHelper::row_size(schema);
-  size_t expected_data_size = rowblock_pb.num_rows() * row_size;
+  size_t expected_data_size = rowblock_pb.num_rows() * row_stride;
+  size_t null_bitmap_offset = schema.byte_size() + total_padding;
 
   if (PREDICT_FALSE(row_data_slice->size() != expected_data_size)) {
     return Status::Corruption(
@@ -493,22 +507,29 @@ Status RewriteRowBlockPointers(const Schema& schema, const RowwiseRowBlockPB&
ro
                    row_data_slice->size(), expected_data_size, rowblock_pb.num_rows()));
   }
 
+  size_t padding_so_far = 0;
   for (int i = 0; i < schema.num_columns(); i++) {
     const ColumnSchema& col = schema.column(i);
+    if (col.type_info()->type() == UNIXTIME_MICROS && pad_unixtime_micros_to_16_bytes)
{
+      padding_so_far += 8;
+      continue;
+    }
     if (col.type_info()->physical_type() != BINARY) {
       continue;
     }
 
+    size_t column_offset = schema.column_offset(i) + padding_so_far;
+
     int row_idx = 0;
-    size_t offset = 0;
-    while (offset < row_data_slice->size()) {
-      ContiguousRow row(&schema, &row_data[offset]);
-      uint8_t* dst_cell = row.mutable_cell_ptr(i);
+    size_t row_offset = 0;
+    while (row_offset < row_data_slice->size()) {
+      uint8_t* row_ptr = row_data + row_offset;
+      uint8_t* cell_ptr = row_ptr + column_offset;
 
-      if (!col.is_nullable() || !row.is_null(i)) {
+      if (!col.is_nullable() || !BitmapTest(row_ptr + null_bitmap_offset, i)) {
         // The pointer is currently an offset into indir_data. Need to replace it
         // with the actual pointer into indir_data
-        Slice *slice = reinterpret_cast<Slice *>(dst_cell);
+        Slice *slice = reinterpret_cast<Slice *>(cell_ptr);
         size_t offset_in_indirect = reinterpret_cast<uintptr_t>(slice->data());
 
         // Ensure the updated pointer is within the bounds of the indirect data.
@@ -525,7 +546,7 @@ Status RewriteRowBlockPointers(const Schema& schema, const RowwiseRowBlockPB&
ro
       }
 
       // Advance to next row
-      offset += row_size;
+      row_offset += row_stride;
       row_idx++;
     }
   }
@@ -614,20 +635,18 @@ void AppendRowToString<RowBlockRow>(const RowBlockRow& row,
string* buf) {
 // be copied to column 'dst_col_idx' in the output protobuf; otherwise,
 // dst_col_idx must be equal to col_idx.
 template<bool IS_NULLABLE, bool IS_VARLEN>
-static void CopyColumn(const RowBlock& block, int col_idx,
-                       int dst_col_idx, uint8_t* dst_base,
-                       faststring* indirect_data, const Schema* dst_schema) {
+static void CopyColumn(const RowBlock& block, int col_idx, int dst_col_idx, uint8_t*
dst_base,
+                       faststring* indirect_data, const Schema* dst_schema, size_t row_stride,
+                       size_t schema_byte_size, size_t column_offset) {
   DCHECK(dst_schema);
   ColumnBlock cblock = block.column_block(col_idx);
-  size_t row_stride = ContiguousRowHelper::row_size(*dst_schema);
-  uint8_t* dst = dst_base + dst_schema->column_offset(dst_col_idx);
-  size_t offset_to_null_bitmap = dst_schema->byte_size() - dst_schema->column_offset(dst_col_idx);
+  uint8_t* dst = dst_base + column_offset;
+  size_t offset_to_null_bitmap = schema_byte_size - column_offset;
 
   size_t cell_size = cblock.stride();
   const uint8_t* src = cblock.cell_ptr(0);
 
-  BitmapIterator selected_row_iter(block.selection_vector()->bitmap(),
-                                   block.nrows());
+  BitmapIterator selected_row_iter(block.selection_vector()->bitmap(), block.nrows());
   int run_size;
   bool selected;
   int row_idx = 0;
@@ -639,13 +658,11 @@ static void CopyColumn(const RowBlock& block, int col_idx,
     }
     for (int i = 0; i < run_size; i++) {
       if (IS_NULLABLE && cblock.is_null(row_idx)) {
-        memset(dst, 0, cell_size);
         BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, true);
       } else if (IS_VARLEN) {
         const Slice *slice = reinterpret_cast<const Slice *>(src);
         size_t offset_in_indirect = indirect_data->size();
-        indirect_data->append(reinterpret_cast<const char*>(slice->data()),
-                              slice->size());
+        indirect_data->append(reinterpret_cast<const char*>(slice->data()), slice->size());
 
         Slice *dst_slice = reinterpret_cast<Slice *>(dst);
         *dst_slice = Slice(reinterpret_cast<const uint8_t*>(offset_in_indirect),
@@ -669,9 +686,12 @@ static void CopyColumn(const RowBlock& block, int col_idx,
 // Because we use a faststring here, ASAN tests become unbearably slow
 // with the extra verifications.
 ATTRIBUTE_NO_ADDRESS_SAFETY_ANALYSIS
-void SerializeRowBlock(const RowBlock& block, RowwiseRowBlockPB* rowblock_pb,
+void SerializeRowBlock(const RowBlock& block,
+                       RowwiseRowBlockPB* rowblock_pb,
                        const Schema* projection_schema,
-                       faststring* data_buf, faststring* indirect_data) {
+                       faststring* data_buf,
+                       faststring* indirect_data,
+                       bool pad_unixtime_micros_to_16_bytes) {
   DCHECK_GT(block.nrows(), 0);
   const Schema& tablet_schema = block.schema();
 
@@ -679,19 +699,45 @@ void SerializeRowBlock(const RowBlock& block, RowwiseRowBlockPB*
rowblock_pb,
     projection_schema = &tablet_schema;
   }
 
-  size_t old_size = data_buf->size();
-  size_t row_stride = ContiguousRowHelper::row_size(*projection_schema);
-  int num_rows = block.selection_vector()->CountSelected();
-  data_buf->resize(old_size + row_stride * num_rows);
-  uint8_t* base = reinterpret_cast<uint8_t*>(&(*data_buf)[old_size]);
-
-  size_t proj_schema_idx = 0;
-  for (int t_schema_idx = 0; t_schema_idx < tablet_schema.num_columns(); t_schema_idx++)
{
-    const ColumnSchema& col = tablet_schema.column(t_schema_idx);
-    proj_schema_idx = projection_schema->find_column(col.name());
-    if (proj_schema_idx == -1) {
-      continue;
+  // Check whether we need to pad or if there are nullable columns, this will dictate whether
+  // we need to set memory to zero.
+  size_t total_padding = 0;
+  bool has_nullable_cols = false;
+  for (int i = 0; i < projection_schema->num_columns(); i++) {
+    if (projection_schema->column(i).is_nullable()) {
+      has_nullable_cols = true;
+    }
+    // If we're padding UNIXTIME_MICROS for Impala we need to calculate the total padding
+    // size to adjust the row_stride.
+    if (pad_unixtime_micros_to_16_bytes &&
+        projection_schema->column(i).type_info()->type() == UNIXTIME_MICROS) {
+      total_padding += 8;
     }
+  }
+
+  size_t old_size = data_buf->size();
+  size_t row_stride = ContiguousRowHelper::row_size(*projection_schema) + total_padding;
+  size_t num_rows = block.selection_vector()->CountSelected();
+  size_t schema_byte_size = projection_schema->byte_size() + total_padding;
+  size_t additional_size = row_stride * num_rows;
+
+  data_buf->resize(old_size + additional_size);
+  uint8_t* base = &(*data_buf)[old_size];
+
+  // Zero out the memory if we have nullable columns or if we're padding slots so that we
don't leak
+  // unrelated data to the client.
+  if (total_padding != 0 || has_nullable_cols) {
+    memset(base, 0, additional_size);
+  }
+
+  size_t t_schema_idx = 0;
+  size_t padding_so_far = 0;
+  for (int p_schema_idx = 0; p_schema_idx < projection_schema->num_columns(); p_schema_idx++)
{
+    const ColumnSchema& col = projection_schema->column(p_schema_idx);
+    t_schema_idx = tablet_schema.find_column(col.name());
+    DCHECK_NE(t_schema_idx, -1);
+
+    size_t column_offset = projection_schema->column_offset(p_schema_idx) + padding_so_far;
 
     // Generating different functions for each of these cases makes them much less
     // branch-heavy -- we do the branch once outside the loop, and then have a
@@ -700,20 +746,24 @@ void SerializeRowBlock(const RowBlock& block, RowwiseRowBlockPB*
rowblock_pb,
     // even bigger gains, since we could inline the constant cell sizes and column
     // offsets.
     if (col.is_nullable() && col.type_info()->physical_type() == BINARY) {
-      CopyColumn<true, true>(block, t_schema_idx, proj_schema_idx, base, indirect_data,
-                             projection_schema);
+      CopyColumn<true, true>(block, t_schema_idx, p_schema_idx, base, indirect_data,
+                             projection_schema, row_stride, schema_byte_size, column_offset);
     } else if (col.is_nullable() && col.type_info()->physical_type() != BINARY)
{
-      CopyColumn<true, false>(block, t_schema_idx, proj_schema_idx, base, indirect_data,
-                              projection_schema);
+      CopyColumn<true, false>(block, t_schema_idx, p_schema_idx, base, indirect_data,
+                              projection_schema, row_stride, schema_byte_size, column_offset);
     } else if (!col.is_nullable() && col.type_info()->physical_type() == BINARY)
{
-      CopyColumn<false, true>(block, t_schema_idx, proj_schema_idx, base, indirect_data,
-                              projection_schema);
+      CopyColumn<false, true>(block, t_schema_idx, p_schema_idx, base, indirect_data,
+                              projection_schema, row_stride, schema_byte_size, column_offset);
     } else if (!col.is_nullable() && col.type_info()->physical_type() != BINARY)
{
-      CopyColumn<false, false>(block, t_schema_idx, proj_schema_idx, base, indirect_data,
-                               projection_schema);
+      CopyColumn<false, false>(block, t_schema_idx, p_schema_idx, base, indirect_data,
+                               projection_schema, row_stride, schema_byte_size, column_offset);
     } else {
       LOG(FATAL) << "cannot reach here";
     }
+
+    if (col.type_info()->type() == UNIXTIME_MICROS && pad_unixtime_micros_to_16_bytes)
{
+      padding_so_far += 8;
+    }
   }
   rowblock_pb->set_num_rows(rowblock_pb->num_rows() + num_rows);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/89f39c59/src/kudu/common/wire_protocol.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol.h b/src/kudu/common/wire_protocol.h
index 6448282..d771ab8 100644
--- a/src/kudu/common/wire_protocol.h
+++ b/src/kudu/common/wire_protocol.h
@@ -123,18 +123,27 @@ Status ColumnPredicateFromPB(const Schema& schema,
 // If 'client_projection_schema' is not NULL, then only columns specified in
 // 'client_projection_schema' will be projected to 'data_buf'.
 //
+// If 'pad_unixtime_micros_to_16_bytes' is true, UNIXTIME_MICROS slots in the projection
+// schema will be padded to the right by 8 (zero'd) bytes for a total of 16 bytes.
+//
 // Requires that block.nrows() > 0
 void SerializeRowBlock(const RowBlock& block, RowwiseRowBlockPB* rowblock_pb,
-                       const Schema* client_projection_schema,
-                       faststring* data_buf, faststring* indirect_data);
+                       const Schema* projection_schema,
+                       faststring* data_buf, faststring* indirect_data,
+                       bool pad_unixtime_micros_to_16_bytes = false);
 
 // Rewrites the data pointed-to by row data slice 'row_data_slice' by replacing
 // relative indirect data pointers with absolute ones in 'indirect_data_slice'.
 // At the time of this writing, this rewriting is only done for STRING types.
 //
+// It 'pad_unixtime_micros_to_16_bytes' is true, this function will take padding into
+// account when rewriting the block pointers.
+// See: SerializeRowBlock() for the actual format.
+//
 // Returns a bad Status if the provided data is invalid or corrupt.
 Status RewriteRowBlockPointers(const Schema& schema, const RowwiseRowBlockPB& rowblock_pb,
-                               const Slice& indirect_data_slice, Slice* row_data_slice);
+                               const Slice& indirect_data_slice, Slice* row_data_slice,
+                               bool pad_unixtime_micros_to_16_bytes = false);
 
 // Extract the rows stored in this protobuf, which must have exactly the
 // given Schema. This Schema may be obtained using ColumnPBsToSchema.


Mime
View raw message