impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dan Hecht (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Date Mon, 06 Nov 2017 21:58:30 GMT
Dan Hecht has posted comments on this change. ( )

Change subject: IMPALA-4856: Port data stream service to KRPC

Patch Set 8:

File be/src/runtime/krpc-data-stream-mgr.h:
PS8, Line 435: a request
'num_request' requests
File be/src/runtime/
PS8, Line 62: 10000
how was that chosen? do we have a test case that causes this queue to fill up?
PS8, Line 109:       // Transfer the early senders into 'deferred_rpcs_' queue of the corresponding
             :       // sender queue. This makes sure new incoming RPCs won't pass these early
             :       // leading to starvation.
this comment seems out of place. this is more an implementation detail of the receiver and
handled properly inside ProcessEarlySender().  You could incorporate this in the comment for
ProcessEarlySender() (to motivate why it uses the deferred_rpcs_ queue).
File be/src/runtime/krpc-data-stream-recvr.h:
PS8, Line 129: .
and start a deserialization task to process it asynchronously.
PS8, Line 130: Transfer
this "transfer" is in the oppose direction of how our "Transfer" methods usually go (e.g.
src->TransferResourcesOwnership(dest)). Maybe call this ProcessEarlySender() (though I
don't love "process" either since it's so vague).
PS8, Line 197: time
cpu time or wall time?
PS8, Line 200: time
same question
File be/src/runtime/
PS8, Line 67: data
the resources
PS8, Line 72: 'payload'
RPC state
PS8, Line 73: deferred_rpcs_
we shouldn't normally refer to private fields in public class comments, but given this is
an internal class to the recvr, we can leave this.
PS8, Line 87:   void TransferEarlySender(std::unique_ptr<TransmitDataCtx> ctx);
same comment as recvr header comment.
PS8, Line 195: while (current_batch_.get() == nullptr) {
I don't think we need this loop. see other comments in this function.
PS8, Line 197: !is_cancelled_ && batch_queue_.empty()
nit: consider swapping the order of these so that the fast case comes first (!batch_queue_.empty())
but also to match the comment ("or we know we're done" corresponds to the is_cancelled_ and
num_remaining_senders_ == 0 cases).
PS8, Line 201:       CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_);
             :       CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_);
             :       CANCEL_SAFE_SCOPED_TIMER(
             :           received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_,
             :           &is_cancelled_);
there's got to be a cleaner way to do this but ignore for now
PS8, Line 210: 
nit: i think we could do without some of the blank lines in this method to make more code
fit on a screen
PS8, Line 211: deferred_rpcs_.empty() && batch_queue_.empty()
why not write this condition as:
num_renaming_senders_ == 0

then, it's more clear that these three conditions correspond to the loop exit conditions.
PS8, Line 218: !batch_queue_.empty()
given that we just checked the other two loop exit conditions, isn't this definitely true?
i.e. we don't need this guard it seems.
PS8, Line 227: .
now that there might be space or the batch queue might be empty.
PS8, Line 229: .
to parallelize the CPU bound deserialization work.
PS8, Line 230:       // No point in dequeuing more than number of deserialization threads
true, though this doesn't quite make sense given that the thread pool is shared across all
recvrs. but i guess it's an upper bound.
PS8, Line 239:         l.unlock();
once you get rid of the loop, I think you'll be able to eliminate this unlock/lock/unlock
and just drop the lock (via scope), which then also makes this easier to reason about.
PS8, Line 389:     deferred_rpcs_.pop();
at this point, 'ctx' effectively takes ownership, right? we should add a comment that says
that and that says we cannot return with "delete ctx".

Even better would be to move the front() into a temp unique_ptr in the function scope. Or
you could always pop by moving into a unique_ptr and re-push_front() if you find there is
no space. But the main point is it'd be nice to use a smart pointer to ensure we don't return
and leak.
PS8, Line 406:     ++num_deserialize_tasks_pending_;
             :     COUNTER_ADD(recvr_->num_deferred_batches_, 1);
nit: let's reorder these two lines since num_deferred_batches_ is counting the nmber of things
that go into deferred_rpcs_ while num_deserialization_tasks_pending_ is counting the number
of tasks added to to the DS mgr.

To view, visit
To unsubscribe, visit

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 8
Gerrit-Owner: Michael Ho <>
Gerrit-Reviewer: Dan Hecht <>
Gerrit-Reviewer: Michael Ho <>
Gerrit-Reviewer: Mostafa Mokhtar <>
Gerrit-Reviewer: Sailesh Mukil <>
Gerrit-Comment-Date: Mon, 06 Nov 2017 21:58:30 +0000
Gerrit-HasComments: Yes

  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message