impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dan Hecht (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Date Wed, 25 Oct 2017 17:19:38 GMT
Dan Hecht has posted comments on this change. ( )

Change subject: IMPALA-4856: Port data stream service to KRPC

Patch Set 3:


Some more comments, still going though.
File be/src/runtime/
PS3, Line 75: cached
see comment in row-batch.h about this terminology.
PS3, Line 89: // safe to free them once the callback has been invoked.
I think we should add a reference to KUDU-2011 somewhere here like:

Note that due to KUDU-2011, timeout cannot be used with outbound sidecars. The client has
no idea when it is safe to reclaim the sidecar buffer (~RpcSidecar() should be the right place,
except that's currently called too early).  RpcController::Cancel(), however, ensures that
the callback is called only after the RPC layer no longer references the sidecar buffer.
PS3, Line 92: query
query? does it mean fragment instance?
PS3, Line 124: Shutdown the RPC thread
is that still accurate?
PS3, Line 139:   int buffer_size_;
that could use a comment.
PS3, Line 141:   const TNetworkAddress address_;
             :   TUniqueId fragment_instance_id_;
             :   PlanNodeId dest_node_id_;
those could be commented together to say they identify the destination. it's a little odd
that plan node id is prefixed "dest" when the others are not.

it also seems weird that we need both these and the req_ field since shouldn't they just be
stored there?  Or seems we should get rid of the req_ and just generate it when sending.
PS3, Line 159: num_cached_proto_batches_
caps since constant?
also, can this ever be something other than 2 without writing the code? i.e. doesn't the code
assume this value is 2 in various ways?
PS3, Line 175: proxy
PS3, Line 197:   bool remote_recvr_closed_ = false;
why is that needed now? 
also, shouldn't we do something different, at a higher level, in that case (like cancel this
PS3, Line 200:   Status rpc_status_;
I think it would help to associate this with rpc_in_flight_ - move them adjacent and say that
rpc_status_ is valid only when rpc_in_flight_ is false, or something.
PS3, Line 218: 2
if we have the constant, shouldn't that use it?
PS3, Line 334:   auto pred = [this]() -> bool { return !rpc_in_flight_ || ShouldTerminate();
             :   auto timeout = std::chrono::system_clock::now() + milliseconds(50);
             :   while (!rpc_done_cv_.wait_until(*lock, timeout, pred)) {
             :     timeout = system_clock::now() + milliseconds(50);
             :   }
seems simpler to just write:

while (rpc_in_flight_ && !ShouldTerminate()) {
  auto timeout = std::chrono::system_clock::now() + milliseconds(50);
  rpc_done_cv_.wait_until(*lock, timeout);

or even better to use wait_for() which takes the relative timeout.
Or should we use our ConditionVariable wrapper? Especially if we want to start instrumenting
these things better. But if it's work to switch it over, it's okay to keep it condition_variable,
but let's at least make the code more straight forward.
File be/src/runtime/row-batch.h:
PS3, Line 52: struct ProtoRowBatch {
I think we should get rid of this structure all together. IMO, the indirection just adds confusion.

On the receive side, it seems we can just get the header and sidecars directly from the request,
which is already threaded through the RPC handler anyway.  Pulling it into a ProtoRowBatch
just makes it unclear where the not yet deserialized rowbatch comes from.

On the send side, I think we should just work directly on CachedProtoRowBatch (but rename
that thing, see below). The indirection through the ProtoRowBatch pointers (aka Slice) makes
the lifetime and ownership harder to reason about.

I also found the name confusing this only the header is protobuf. The rest is KRPC specific

Any reason we shouldn't eliminate this?
PS3, Line 89: CachedProtoRowBatch
what is "cached" about this?

How about calling this OutboundRowBatch, RpcRowBatch, or SerializedRowBatch?

The first name maybe is best since it can really only be used for outbound, it seems.

ProtoRowBatch seems unnecessary in this case too, since we can just create the Slice on-the-fly
when we want to send the OutboundRowBatch, no?
PS3, Line 431: 
             :   /// Overload for testing that allows the test to force the deduplication
             :   Status Serialize(TRowBatch* output_batch, bool full_dedup);
             :   /// Shared implementation between thrift and protobuf to serialize this row
             :   ///
             :   /// 'full_dedup': true if full deduplication is used.
             :   ///
             :   /// 'tuple_offsets': Updated to contain offsets of all tuples into 'tuple_data'
             :   /// return . There are a total of num_rows * num_tuples_per_row offsets.
An offset
             :   /// of -1 records a NULL.
             :   ///
             :   /// 'tuple_data': Updated to hold the serialized tuples' data. If 'compression_type'
             :   /// is THdfsCompression::LZ4, this is LZ4 compressed.
             :   ///
             :   /// 'uncompressed_size': Updated with the uncompressed size of 'tuple_data'.
             :   ///
             :   /// 'compression_type': Updated with the compression type applied on 'tuple_data'.
             :   /// THdfsCompression::NONE if there is no compression applied.
             :   ///
             :   /// Returns error status if serialization failed. Returns OK otherwise.
             :   Status Serialize(bool full_dedup, vector<int32_t>* tuple_offsets, string*
             :       int64_t* uncompressed_size, THdfsCompression::type* compression_type);
             :   /// Shared implementation between thrift and protobuf to deserialize a row
             :   ///
             :   /// 'input_tuple_data': contains pointer and size of tuples' data buffer.
             :   /// If 'compression_type' is not THdfsCompression::NONE, tuple data is compressed.
             :   ///
             :   /// 'input_tuple_offsets': an int32_t array of tuples; offsets into 'input_tuple_data'.
             :   /// Used for populating the tuples in the row batch with actual pointers.
             :   ///
             :   /// 'uncompressed_size': the uncompressed size of 'input_tuple_data' if it's
             :   ///
             :   /// 'compression_type': If 'input_tuple_data' is compressed, it's the compression
             :   /// codec used.
             :   ///
             :   void Deserialize(const kudu::Slice& input_tuple_data,
             :       const kudu::Slice& input_tuple_offsets, int64_t uncompressed_size,
             :       THdfsCompression::type compression_type);
let's leave a TODO about cleaning this all up once we can remove the thrift implementation.
do we have a JIRA to do that (not for milestone 1)?
File be/src/service/
PS3, Line 64:   ProtoRowBatch batch;
            :   Status status = FromKuduStatus(context->GetInboundSidecar(
            :       request->row_batch_header().tuple_data_sidecar_idx(), &batch.tuple_data));
            :   if (status.ok()) {
            :     status = FromKuduStatus(context->GetInboundSidecar(
            :         request->row_batch_header().tuple_offsets_sidecar_idx(), &batch.tuple_offsets));
            :   }
            :   if (status.ok()) {
            :     batch.header = request->row_batch_header();
see comment in row-batch.h.  I think we should do this later when we actually want to deserialize
the row batch.  We have to keep the 'request' around until that time anyway, right?

To view, visit
To unsubscribe, visit

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 3
Gerrit-Owner: Michael Ho <>
Gerrit-Reviewer: Dan Hecht <>
Gerrit-Reviewer: Michael Ho <>
Gerrit-Reviewer: Mostafa Mokhtar <>
Gerrit-Reviewer: Sailesh Mukil <>
Gerrit-Reviewer: Tim Armstrong <>
Gerrit-Comment-Date: Wed, 25 Oct 2017 17:19:38 +0000
Gerrit-HasComments: Yes

  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message