impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dan Hecht (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Date Fri, 03 Nov 2017 23:07:09 GMT
Dan Hecht has posted comments on this change. ( )

Change subject: IMPALA-4856: Port data stream service to KRPC

Patch Set 7:

File be/src/runtime/krpc-data-stream-mgr.h:
PS7, Line 102: the batch is added
             : /// to the receiver's 'deferred_batches_'
it's not really the batch added. and it's not just a single structure for the receiver (it
may go into one of many queues for merging exchange). So how about saying:
... the RPC state is saved into a deferred queue.
PS7, Line 104: from the pending sender queue
how about: ... from a deferred RPC queue and the row batch is deserialized.
PS7, Line 393: 
quick comment for why we define a move constructor and move operator=, since we don't typically
want to define those.
File be/src/runtime/
PS7, Line 199:   DeserializeTask payload =
             :       {DeserializeTaskType::EARLY_SENDERS, finst_id, dest_node_id, 0};
             :   deserialize_pool_.Offer(move(payload));
doesn't this mean we make early sender draining single threaded? shoudl we instead use the
sender_id in this case as well and offer work per sender? or do we think this doesn't matter?
PS7, Line 217: already_unregistered
that shouldn't be possible in the DEFERRED_BATCHES case, right? so i'd probably move this
DCHECK into the cases below so you can tighten it up.
PS7, Line 235:     for (const unique_ptr<TransmitDataCtx>& ctx : early_senders.waiting_sender_ctxs)
shouldn't we process waiting_sender_ctxs before closed_sender_ctxs? Otherwise, if the same
sender is in both lists we'll process those RPCs out of order. I guess that can't really happen
given the current implementation of not responding to early RPCs and that senders only let
one in flight, but it still seems to make more sense to do it the other way around.
PS7, Line 236: already_unregistered
why is this possible in the waiting_sender_ctxs case but not the closed_sender_ctxs case?
PS7, Line 247: already_unregistered
why is that possible?
PS7, Line 248: recvr->AddDeferredBatches(task.sender_id);
So I guess we no longer multithread within a single sender queue (and for non-merging, within
a single receiver) doing it this way. I think that's okay but was it intentional?
File be/src/runtime/krpc-data-stream-recvr.h:
PS7, Line 78: The caller must acquire data from the
            :   /// returned batch
is that talking about calling TransferAllResources(), or can the caller do it directly?
File be/src/runtime/
PS6, Line 127:   // If true, the receiver fragment for this stream got cancelled.
> For the non-merging case, there is essentially only one queue.
As mentioned elsewhere, I'm not totally convinced yet that this is the right way to do it
but, yes, we can think about it more and change it later if necessary.
File be/src/runtime/
PS7, Line 72: s
PS7, Line 77: Adds as many deferred batches as possible
hmm I'm still not convinced this is the right thing to do (in the merging case). It seems
like it's left up to chance as to the order that deferred batches are drained across the sender
queues. But we can think about this more and address it later.
PS7, Line 97: (1) 'batch_queue' is empty and there is no pending insertion
the HasSpace name seems wrong for condition (1). From the name HasSpace, I was expecting it
only to check for condition (2) because that tells us if there is still space.  How about
calling this CanEnqueue() or ShouldEnqueue()?
PS7, Line 113: size
deserialized or serialized size?
PS7, Line 169: deferred batches
these aren't really deferred batches though. They are deferred RPCs (which do contain batches).
If you like the "deferred batches" terminology, I can live with it, but it seems somewhat
misleading (though not worse than "blocked senders").
PS7, Line 220:     // if we haven't notified them already.
rather than explaining what, we should explain why:
We've either removed a batch from the sender queue (and so now there might be space to move
in a deferred batch), or the sender queue is empty. In either case, try to process the deferred
batches and move them to the sender queue.

Though can the second case (empty case) even happen? Shouldn't we have already started this
process if the queue was empty.

That is, why is this so complicated. Why can't it just be:

(1) while batch_queue_.empty() wait for data arrival.
(2) Dequeue from batch queue and trigger deferred batch draining. 

i.e. that first while loop shoudn't need to care about the deferred_batches_ state. Whoever
made the batch_queue_ empty should be responsible for triggering the draining so we can just
wait for the arrival. And if there was nothing to drain, then the next batch to arrive would
immediately go into the batch_queue_ anyway.  So it seems like this can all be simplified,
am I missing something?
PS7, Line 223: deferred_batches_.front()->request->sender_id();
that seems misleading in the case of !merging. sender_id will be some random one, but we'll
funnel back into 0 anyway.  But I guess this sender queue doesn't know what it's own sender
id is, so okay.
PS7, Line 224:       l.unlock();
why do we need to drop the lock?
PS7, Line 263:   COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
let's add a comment:
// Reserve queue space before dropping the lock.
PS7, Line 269: deserialize multiple batches in parallel.
I guess this only happens in the non-merging case, right?
PS7, Line 274: DCHECK_GT(num_pending_enqueue_, 0);
I'm not sure what this dcheck is meant to prove.
PS7, Line 322: starvation
starvation of a sender in the non-merging case.
PS7, Line 324: Note: It's important that we enqueue the new batch regardless of buffer limit
             :     // the queue is currently empty. In the case of a merging receiver, batches
             :     // received from a specific queue based on data order, and the pipeline
will stall
             :     // if the merger is waiting for data from an empty queue that cannot be
filled because
             :     // the limit has been reached.
that comment shoudl be inside HasSpace() now.
PS7, Line 345: void KrpcDataStreamRecvr::SenderQueue::AddDeferredBatches() {
are we able to exercise the deferred batches path in functional testing? if not, I think we
should figure out a way.
PS7, Line 531:   // All the sender queues will be cancelled after this call returns.
is that accurate?
File be/src/runtime/
PS6, Line 369:     // the need to manage yet another thread pool.
> Mostly to avoid the complexity of managing yet another thread pool.
Okay. I thought maybe to keep things cache local.
File be/src/runtime/
PS7, Line 235: call
nit: delete call too. 'C' of RPC is call.
PS7, Line 257: (e.g. Connection object was shutdown due to network errors)
with thrift RPC, wouldn't we have retried making a connection and doing the RPC in this case?
 and now with KRPC we will fail the query? Is that true?
PS7, Line 273: current_batch_
PS7, Line 554:     while (rpc_in_flight_) {
             :       rpc_done_cv_.wait(l);
             :     }
             :   }
one liner?
PS7, Line 558:   DCHECK(!rpc_in_flight_);
that becomes vacuous given the while loop condition.
File be/src/runtime/row-batch.h:
PS7, Line 349: output_batch
outbound_batch (or rename the param)
File be/src/runtime/
PS7, Line 426:   result += header.num_tuples_per_row() * sizeof(int32_t);
could you order this computation in the same order as thrift, to make it easy to see they
are doing the same thing?
PS7, Line 432: batch.tuple_offsets_.size();
what is this trying to compute? the size of the tuple_ptrs_? if so, it doesn't look right
since each offset will expand into a pointer, right? i.e. shouldn't it be tuple_offsets_.size()
* sizeof(Tuple*).

And the thrift version looks wrong too.  And the Slice version is also different and wrong
(since size() is different for Slice vs vector).
PS7, Line 433: batch.header_.num_tuples_per_row() * sizeof(int32_t);
what is this accounting for?
PS7, Line 440: batch.header_.num_tuples_per_row() * sizeof(int32_t);
what's that accounting?

To view, visit
To unsubscribe, visit

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 7
Gerrit-Owner: Michael Ho <>
Gerrit-Reviewer: Dan Hecht <>
Gerrit-Reviewer: Michael Ho <>
Gerrit-Reviewer: Mostafa Mokhtar <>
Gerrit-Reviewer: Sailesh Mukil <>
Gerrit-Comment-Date: Fri, 03 Nov 2017 23:07:09 +0000
Gerrit-HasComments: Yes

  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message