impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Ho (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Date Wed, 25 Oct 2017 22:54:52 GMT
Michael Ho has posted comments on this change. ( )

Change subject: IMPALA-4856: Port data stream service to KRPC

Patch Set 4:


Reply to some of the comments for now. Will look into removing ProtoRowBatch next. Will not
rebase until next version of the patch is pushed.
File be/src/common/
PS3, Line 246: void Status::FromProto(const StatusPB& status) {
> this is the same as FromThrift() effectively, right? Can we make the two lo
PS3, Line 262: void Status::FreeMessage() noexcept {
> same comment. let's make this and ToThrift look the same so it's obvious th
File be/src/exec/
PS3, Line 109:   RETURN_IF_CANCELLED(state);
> why do we do this in some Open() but not all? Should we just do it in ExecN
Actually, I noticed similar patterns in other exec nodes. Let me keep this line of change
for now and do the refactoring in another change.
File be/src/rpc/rpc-mgr.h:
PS3, Line 154:   ~RpcMgr() {
> nit: one-liner?
File be/src/rpc/
PS3, Line 77:   VLOG_QUERY << "Registered KRPC service: " << service_pool->service_name();
> Should we add a log message stating which services we registered with KRPC?
File be/src/runtime/krpc-data-stream-mgr.h:
PS3, Line 63: tiple RPCs. The logical connection between a pair of client 
> I don't think that's accurate. see questions in krpc-data-stream-recvr.h ab
Comment rephrased.
PS3, Line 93: After the first batch has been received, a sender continues to send batches,
> XXX check whether these are really different
PS3, Line 94: () RPC
> what buffer? do you mean queue?
PS3, Line 108: 
> what does that mean?  Is it saying that during unordinary operation, a send
It means the fragment instance completes without hitting any error. If a fragment instance
ends early, it may end up not calling EOS() RPC. For instance, if there is any cancellation,
the stream will just be torn down without sending EOS as it's expected that the receivers'
fragments will be cancelled too.
PS3, Line 140: RPCs, and may be cancell
> what are "it" and "its" here? "the sender" and "the RPC's"?
the result will be dropped silently by the RPC layer.
PS3, Line 141: /// time. If an in-flight RPC is cancelled at the sender side, the reply from
the receiver
> is that still true now that we have cancellation of RPCs?
Yup. If an RPC is cancelled before the result arrives, the KRPC code will just ignore the
PS3, Line 153: 
> sending fragment?
PS3, Line 164:  structure is const
> is that because the recvr hasn't showed up yet, or because the stream's que
both. Comments updated.
PS3, Line 166: 
> is that talking about the 'request' field below, or something different?
PS3, Line 175:   kudu::rpc::RpcContext* rpc_context;
> what's the relationship between this and proto_batch?
proto_batch is the inbound row_batch populated from information in 'request' and 'rpc_context'.
I agree that it's not strictly necessary to keep it in TransmitDataCtx.
PS3, Line 178:   /// such as the destination finst ID, plan node ID and the row batch header.
> who owns it?
'context'. Commends added.
PS3, Line 235: the mai
> dest_node_id?
PS3, Line 239: Create a receiver for a specific fragment_instance_id/dest_node_id.
             :   /// If is_m
> that seems unnecessary but don't change it now.
The problem is that Close() of a receiver is not synchronized with the service threads which
add the row batches. So, it's possible that there are still outstanding references to the
receiver after it has been closed by the owning exchange node.

We should fix this by synchronizing the Close() of a receiver and all outstanding service
threads which hold reference to it.
PS3, Line 246: t
> 'proto_batch'?
PS3, Line 248: o
> 'request'.
PS3, Line 266: The RPC may not be res
> is this an RPC handler? I think we should just be explicit about which of t
PS3, Line 267: 
> what RPC is this talking about? If this is a handler, then it's clear.
PS3, Line 274: 
> Does it close or cancel? (or is there no difference?)
Cancel is more accurate. Close() will free all the buffered row batches. Cancel() will just
mark all sender queues as cancelled so no more row batches can be enqueued.
PS3, Line 284: 
> To be consistent with terminology used in class comment, maybe say "deferre
PS3, Line 314: 
> susper-nit: Capital 'W'
PS3, Line 340: 
> what is that saying? is that a misplaced comma or am I reading this wrong?
We sort all receiver IDs based on (finst_id, dest_node_id). To locate all receivers for a
given fragment instance, we call std::set::lower_bound(finst_id, 0) to identify the first
entry and then iterate until finst_id stops matching. I rephrased the comment to make it less
PS3, Line 341: } else if (a.first.
> I don't understand this.  it kinda sounds like we're trying to be able to f
Clarified in the new comment.
PS3, Line 349:     }
> hmm, I guess we need this now that we can't block the RPC thread?
Yes. In essence, anything which blocks needs to be either stashed somewhere or replied to
with an error status.
PS3, Line 358: ntRe
> Monotonic time
PS3, Line 374: 
> monotonic time
PS3, Line 382:   /// or b) the Maintenance() thread detects that the longest-waiting sender
has been
> all this parallel startup stuff really needs to be revisited (but not for t
PS3, Line 386: 
> maybe call it DeserializeDeferred() or DeserializeWorker() to make it clear
PS3, Line 404:   void EnqueueDeferredBatch(DeserializeWorkItem&& payload);
> how about grouping this with Deferred function above since it's related. Al
PS3, Line 413: ndDat
> what's that?
PS3, Line 414: /// st
> I think that status is not getting checked by the caller. I thought Tim mad
Added check for return status.
PS3, Line 416:   void AddEarlyClosedSender(const TUniqueId& fragment_instance_id,
> let's add a quick comment.
PS3, Line 421: n empty shared_ptr if
> RespondToTimedOutSender() or RespondTimeOutToSender()?
File be/src/runtime/
PS3, Line 112:   for (unique_ptr<TransmitDataCtx>& ctx : early_senders.waiting_sender_ctxs)
             :      EnqueueDeferredBatch({recvr->fragment_instance_id(), move(ctx)});
             :      num_senders_waiting_->Increment(-1);
             :   }
             :   for (unique_ptr<EndDataStreamCtx>& ctx : early_senders.closed_sender_ctxs)
             :      recvr->RemoveSender(ctx->request->sender_id());
             :      Status::OK().ToProto(ctx->response->mutable_status());
             :      ctx->rpc_context->RespondSuccess();
             :      num_senders_waiting_->Increment(-1);
             :   }
> It's not possible for the same sender to be in waiting_senders_ctxs and clo
Yes, it's impossible as the early senders shouldn't have sent EOS() without waiting for the
reply for its previous TransmitData() RPC.
PS3, Line 140:   }
             :   RecvrId recvr_id = make_pair(finst_id, dest_node_id);
             :   if (closed_stream_cache_.find(recvr_id) != closed_stream_cache_.end()) {
             :     *already_unregistered = true;
             :   }
             :   return shared_ptr<KrpcDataStreamRecvr>();
             : }
> I'm thinking it makes sense to prioritize finding the receiver with the ass
PS3, Line 151: vrId recvr_id = make_pair(finst_i
> We could merge the implementations of AddEarlySender() and AddEarlyClosedSe
Probably not worth it.
PS3, Line 174:     // closed_stream_cache_), the sender is timed out by the maintenance thread.
> Add comment "In the worst case, this RPC is so late that the receiver is al
PS3, Line 242: 
             :   {
             :     // TODO: Move this to maintenance thread.
             :     // Remove any closed streams that have been in the cache for more than
             :     // STREAM_EXPIRATION_TIME_MS.
             :     lock_guard<mutex> l(lock_);
             :     ClosedStreamMap::iterator it = closed_stream_expirations_.begin();
             :     int64_t now = MonotonicMillis();
             :     int32_t before = closed_stream_cache_.size();
             :     while (it != closed_stream_expirations_.end() && it->first <
now) {
             :       closed_stream_cache_.erase(it->second);
             :       closed_stream_expirations_.erase(it++);
             :     }
             :     DCHECK_EQ(closed_stream_cache_.size(), closed_stream_expirations_.size());
             :     int32_t after = closed_stream_cache_.size();
             :     if (before != after) {
             :       VLOG_QUERY << "Reduced stream ID cache from " << before <<
" items, to " << after
             :                  << ", eviction took: "
> Historically, we never had a maintenance thread, which is why we did the st
Not a faster response to EOS RPC because the RPC is responded to already at line 240 but I
agree that we should avoid blocking the service threads for this chore.

Added a TODO.
PS3, Line 289: }
             :   const string msg = Substitute(
             :       "Unknown row r
> Should this be a DCHECK instead?
This may still be useful if the receiver is closed twice due to error.
PS3, Line 303: ile (iter != fragment_recvr_set_.end() && iter->first == finst_id)
> Just realized that we could do this more efficiently. Instead of doing an O
Not sure I understand the proposed idea. As discussed offline, we shouldn't call FindRecvr()
for more than the number of receivers in a fragment instance.
File be/src/runtime/krpc-data-stream-recvr.h:
PS3, Line 48: Single receiver of an m:n data stream.
> Either the "an" shouldn't be there or streams shouldn't be plural. But I'm 
Yes, it should be m:n data stream without 's'. So, the definition would be all (send, recvr)
PS3, Line 116: sponded t
> that doesn't seem accurate
PS3, Line 117: an't be
> what's that?
PS3, Line 126: 
> mentioned above, we didn't clearly define what a "stream" actually is, so i
See reply above.
PS3, Line 141: 
> is this in bytes?
Yes. Fixed.
PS3, Line 142: _;
> wouldn't it be more accurate (and consistent with mgr terminology) to say "
File be/src/runtime/
PS1, Line 225:     ++num_pending_enqueue_;
> But batch_queue_ will still be empty, so other callers of AddBatch() will s
The lock at line 196 serializes all callers but yes, this is fixed in PS4.
File be/src/runtime/
PS3, Line 192:   RpcContext* context) {
This lock is pessimistic and prevents multiple threads from deserializing multiple row batches
in parallel.
PS3, Line 253:   lock_guard<SpinLock> l(lock_);
             :   DCHECK_GT(num_remaining_senders_, 0);
             :   num_remaining_senders_ = max(0, num_remaining_senders_ - 1);
             :   VLOG_FILE << "decremented senders: fragment_instance
> Is this that important that we have to do it while holding the lock?
Useful for debugging missing EOS but can be done outside of the lock. It's not on for the
default log level though.
File be/src/runtime/krpc-data-stream-sender.h:
PS3, Line 53: ender instance, and is unique within a fragment.
> it's not clear what that means from just reading the header, though i know 
PS3, Line 59: ONED (broadc
> that's not documented.
PS3, Line 97:   friend class DataStreamTest;
> should that really be public? seems more like a worker function.
Moved to private.
PS3, Line 101:   virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
> that seems weird. is it for testing? if so, can it be protected instead?
PS3, Line 109:   class Channel;
> why is that protected? is this a testing thing?
Overriding the Init() in DataSink which is protected. This is called from DataSink::Create()
so it shouldn't be public. The derived classes of DataSink may still call DataSink::Init()
so it's protected.
File be/src/runtime/
PS3, Line 89: // Note that due to KUDU-2011, timeout cannot be used with outbound sidecars.
The client
> I think we should add a reference to KUDU-2011 somewhere here like:
PS3, Line 92: r ref
> query? does it mean fragment instance?
PS3, Line 124: status if serialization
> is that still accurate?
Removed. Comments carried over from the thrift implementation.
PS3, Line 139:   // Flushes any buffered row batches and sends the EOS RPC to close the channel.
> that could use a comment.
PS3, Line 141: 
             :   int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; }
> those could be commented together to say they identify the destination. it'
The names are carried over from the existing Comments may help.

Please see my reply to the comments about "req_" and "response_" fields below.
PS3, Line 152:   int buffer_size_;
> Add "Not used for unpartitioned and random partitioned types".
PS3, Line 159: ress address_;
> caps since constant?
I renamed this to NUM_OUTBOUND_BATCHES for now. Google C++ style guideline (
suggests otherwise but I think we don't follow the guideline strictly (e.g. the use of LLVM_CLASS_NAME
in various places).

Yes, there is implicit assumption about how the WaitForRpc() will wait for the in-flight RPC
to complete so bumping this number will not provide any benefit. That said, it will help with
readability to have a name for this value instead of using a magic constant.

I plan to rewrite this part of the code later to use a queue which will allow more than 2
outbound row batches. I don't want to add extra complication to this already large patch.
Will leave a TODO.
PS3, Line 175: llptr
> proxy_
PS3, Line 178:   // buffer for the serialized row batches. When one is used for the in-flight
             :   // the execution thread can continue to run and serialize another row batch
to the
             :   // other entry. 'current_batch_idx_' is the index of the entry being used
by the
             :   // in-flight or last completed RPC.
             :   // TODO: replace this with an actual queue. Schedule another RPC callback
in the 
             :   // completion callback if the queue is not empty.
             :   CachedProtoRowBatch cached_proto
> Why do we need to store these in the class? Can't they be local in the func
We need to access these fields from the callback (e.g. due to retry).

The req_ fields may be generated on the fly but the response buffer definitely needs to live
for the entire duration of RPC call. Will look into generating 'req_' on the fly as its lifetime
need not be longer than the RPC invocation itself.
PS3, Line 197:   RpcController rpc_controller_;
> why is that needed now? 
It saves some bandwidth if the receiver is already closed. It's also a stop-gap fix for IMPALA-3990.

If the remote receiver is closed (e.g. hitting a limit), it will live in the closed_stream_cache_
until it gets evicted after STREAM_EXPIRATION_TIME_MS ms. After the receiver is evicted from
this cache, all future calls to this receiver will fail with a timeout. If we have this flag
here, we will save the sender from hitting the timeout issue in most cases. Granted, this
is not a fool-proof solution for IMPALA-3990 (e.g. the sender may not send anything for STREAM_EXPIRATION_TIME_MS
ms) but it will help in most cases. In the long run, we should have a better answer for IMPALA-3990
in the long run. Please also see my previous reply to Tim's comments.

Cancelling the fragment instance may need some thought. For the current partitioning strategies,
closing one of the channels shouldn't prevent a sender from sending to other channels unless
there is only one channel.
PS3, Line 200:   TransmitDataRequestPB req_;
> I think it would help to associate this with rpc_in_flight_ - move them adj
PS3, Line 218: 
> if we have the constant, shouldn't that use it?
This function is removed in the latest patch.
PS3, Line 229: 
> nit: re-invokes or retries
PS3, Line 242:   // the actual RPC when the RPC is rescheduled.
> Add a comment "Should only be called from the main fragment instance thread
PS3, Line 290:   void EndDataStreamCompleteCb();
> It would be nice to separate out the responsibility of setting of certain s
PS3, Line 334:   SCOPED_TIMER(parent_->state_->total_network_send_timer());
             :   // Wait for in-flight RPCs to complete unless the parent sender is closed
or cancelled.
             :   while(rpc_in_flight_ && !ShouldTerminate()) {
> seems simpler to just write:
Done. Will switch over to ConditionVariable as need arises in the future as the TimedWait()
takes boost::unique_lock<boost::mutex>.
PS3, Line 397: 
> CurrentProtoBatch().
PS3, Line 402:       remote_recvr_closed_ = true;
> Would the case where the RPC got cancelled from L547 fall here? Or would it
It can go to either line 406 or here, depending if the completion callback is invoked before
the cancellation event is processed. Either way, the cancellation from L547 will not check
PS3, Line 528:   return Status::OK();
> if (UNLIKELY(remote_recvr_closed_)) return Status::OK(); ?
PS3, Line 644:   return Status::OK();
> The RowBatch is serialized once per channel which is very wasteful. 
File be/src/runtime/row-batch.h:
PS3, Line 52: struct ProtoRowBatch {
> I think we should get rid of this structure all together. IMO, the indirect
ProtoRowBatch is a conceptual representation of a serialized row batch in both the sender
and receiver side. A more appropriate name could be "SerializedRowBatch".  I find it easier
to have everything encapsulated in ProtoRowBatch when passing it to RowBatch::Deserialize()
and hide the details of how it's constructed inside the RPC handler.

Will look into removing the extra layer of indirection.
PS3, Line 89: CachedProtoRowBatch
> what is "cached" about this?
Had hard time coming up with a good name to indicate the re-use of the vector and string buffers
below. May be "reusable" is closer to the actual meaning but it's also quite confusing.

Will switch to OutboundRowBatch and look into merging this with ProtoRowBatch above.
File be/src/service/
PS3, Line 49:   // CloseSender() is guaranteed to eventually respond to this RPC so we don't
do it here.
> nit: "CloseSender() is guaranteed to eventually respond to this RPC, so we 
File common/protobuf/data_stream_service.proto:
PS3, Line 29:   // Id of this fragment in its role as a sender.
            :   optional int32 sender_id = 2;
> what are "IDs" in these cases? let's improve the documentation here. Especi
There is no equivalent of typedef in protobuf as far as I can tell.
Comments updated.
File common/protobuf/row_batch.proto:
PS3, Line 30: 
> in thrift we had TTupleId. Is there a reason we aren't defining those types
As far as I know, there is no equivalent of typedef in protobuf. We can try defining a message
with a single field but this seems unnecessarily cumbersome.
PS3, Line 32: = 2;
> what's tuple_data? not a field in this structure...
That's the tuple data sent as sidecar. Clarified in the new comments.
PS3, Line 39: The 
> size of what?
Size of tuple_data. Comments fixed.
PS3, Line 42: ion is applied.
> do we plan to fix that?
Fixed in latest patch.

To view, visit
To unsubscribe, visit

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 4
Gerrit-Owner: Michael Ho <>
Gerrit-Reviewer: Dan Hecht <>
Gerrit-Reviewer: Michael Ho <>
Gerrit-Reviewer: Mostafa Mokhtar <>
Gerrit-Reviewer: Sailesh Mukil <>
Gerrit-Reviewer: Tim Armstrong <>
Gerrit-Comment-Date: Wed, 25 Oct 2017 22:54:52 +0000
Gerrit-HasComments: Yes

  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message