impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sailesh Mukil (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Date Mon, 25 Sep 2017 14:49:37 GMT
Sailesh Mukil has posted comments on this change. ( )

Change subject: IMPALA-4856: Port data stream service to KRPC

Patch Set 1:

File be/src/runtime/krpc-data-stream-mgr.h:
PS1, Line 110: sent
nit: received
PS1, Line 110: no TransmitData() RPCs will successfully deliver their
             : /// payload.
Why would there be a TransmitData() RPC if EndDataStream() has already been sent? Doesn't
the sender send it only if it knows all its TransmitData() RPCs have been processed?
PS1, Line 133: /// period expires.
As per Tim's comment above, I would also reference IMPALA-3990 as a TODO here for later fixing.
PS1, Line 159: Consider tracking, on the sender, whether a batch has been successfully sent
             : ///   not. That's enough state to realise that a receiver has failed (rather
than not
             : ///   prepared yet), and the data stream mgr can use that to fail an RPC fast,
rather than
             : ///   having the closed-stream list.
It would be nice to have a JIRA for this and reference it here.
PS1, Line 353: waiting_senders
This is a little confusing to follow in the .cc file, since when I see "waiting_senders",
I expect it to be a set of some unique identifiers for a Sender ID.

Although this is unique to a specific sender, it would be a little clearer to call this 'waiting_senders_ctxs'.

Let me know what you think.
PS1, Line 356: closed_senders
Similarly, we could call this 'closed_senders_ctxs'.
File be/src/runtime/
PS1, Line 168:       num_senders_waiting_->Increment(1);
             :       total_senders_waited_->Increment(1);
             :       RecvrId recvr_id = make_pair(fragment_instance_id, request->dest_node_id());
             :       auto payload =
             :           make_unique<TransmitDataCtx>(proto_batch, context, request,
             :       early_senders_map_[recvr_id].waiting_senders.push_back(move(payload));
I'm wondering if it makes sense to add simple inline functions that encapsulate this functionality;
for the sake of readability.

Eg: AddEarlyWaitingSender(), AddEarlyClosedSender()
PS1, Line 213: If no receiver found, but not in the closed stream cache
nit: If no receiver is found, and the receiver is not in the closed stream cache as well,
we still need...
PS1, Line 218:       RecvrId recvr_id = make_pair(fragment_instance_id, dest_node_id);
             :       auto payload = make_unique<EndDataStreamCtx>(context, request,
             :       early_senders_map_[recvr_id].closed_senders.emplace_back(move(payload));
             :       num_senders_waiting_->Increment(1);
             :       total_senders_waited_->Increment(1);
AddEarlyClosedSender() as per comment above, if you agree.
PS1, Line 227:   if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
             :   Status::OK().ToProto(response->mutable_status());
             :   context->RespondSuccess();
This may need some modification based on the recent commit for IMPALA-5199:
File be/src/runtime/
PS1, Line 75: ("new data")
I'm having some trouble understanding what this means. Could you please clarify?
PS1, Line 271: data_arrival_cv_.notify_all();
Shouldn't this notify be done while holding the lock_ ?
PS1, Line 285:   while (!blocked_senders_.empty()) {
nit: Add comment: Respond to blocked senders' RPCs
File be/src/runtime/
PS1, Line 208: %
Should we do a bitwise (cur_batch_idx_ + 1) & 1 instead? Or would the compiler take care
of that?

To view, visit
To unsubscribe, visit

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Ho <>
Gerrit-Reviewer: Sailesh Mukil <>
Gerrit-Comment-Date: Mon, 25 Sep 2017 14:49:37 +0000
Gerrit-HasComments: Yes

  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message