impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Henry Robinson (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC
Date Thu, 20 Apr 2017 01:18:38 GMT
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC

Patch Set 3:

File be/src/runtime/

PS3, Line 327: VLOG_QUERY << "DataStreamMgr maintenance tasks complete. Took: "
             :                << PrettyPrinter::Print(MonotonicMillis() - start, TUnit::TIME_MS);
> Do you think it's worth printing what maintenance tasks were done in this i
Removed for now.
File be/src/runtime/data-stream-mgr.h:

PS2, Line 59: In the first phase the sender initiates 
File be/src/runtime/data-stream-mgr.h:

Line 60: /// first batch. Since the sender may start sending before the receiver is ready,
the data
> we could relatively easily remove that restriction, the execrpc changes are
Any change I can think of would be substantial (two-phase start-up protocol) or bug-prone

Line 77: /// fixed-size buffer for later processing, or discard the batch if the buffer is
full. In
> how can the is-full case come up? shouldn't flow control keep the sender fr
No, for a couple of reasons:

1. Flow control doesn't kick in until the first round of batches have been sent, so all senders
try to send batches at once. 

2. In the steady-state case with a receiver that's faster than the senders, there'll be no
queuing. But if the receiver suddenly stalls there can be many senders still preparing a new
batch, and together they can fill up the queue. 

2. The queue is limited by the byte size of the batches. The next batch might be really large
and overflow the queue, and we can't predict that in general.

Line 84: /// queue. An error code is returned which causes the sender to retry the previous
> sounds complicated. is that really necessary?
I'm not sure which part you're referring to - but yes, in general this is how the flow control
is implemented. The pending sender list replaces the set of blocked threads from Thrift, and
is lighter weight because it gives us the ability to discard the row batch payload.

The error code is used to signal to the RPC layer on the sender side that the RPC should be
retried. This has the benefit of making the retry transparent to the caller code.

Line 113: /// time-out and cancel itself. However, it is usual that the coordinator will initiate
> i guess that extra complication is necessary because the non-existence of a

Line 124: /// The sender has a default timeout of 2 minutes for TransmitData() calls. If the
> why so long?
The former case is detected in almost all cases. If the receiver has shown up, and been closed,
the sender will find that in the closed stream cache.

Line 131: /// immediately. Both of these cases are designed so that the sender can prepare
a new
> in the queued case, it seems like the response should go out when the queue
In the queued batch case, we could look to see if the queue had any more capacity before responding
to the sender. However, it's hard to know how much capacity a sender needs for its next batch
(as batches with strings can vary in size a lot). This strategy is optimistic - if the receiver
is able to accept a batch, we presume that it can accept the next one as well. In my tests,
this worked better than pessimistically pausing senders until we knew for sure there was room
for their particular batch.

PS3, Line 135: idea
> I would rather use the word 'assumption' here.

Line 137: /// notification should not exceed 60s and so the sender will not time out.
> that also sounds complicated.
The sender needs a timeout, after which it will fail, and so the receiver has to try to respond
within that timeout. That's all that's going on here - picking a timeout and then picking
a response time, modulo standard distributed systems issues, that is very likely to be less
than that timeout. This avoids false negatives.

> I think there needs to be some comment clearly distinguishing between this 
I removed this because it was so similar to datastream_sender_timeout_ms that it made sense
to just use the flag for both the initial timeout, and the row-batch processing timeout.

Line 245:   void AddData(const TUniqueId& fragment_instance_id, TransmitDataCtx&&
> in other words, pass in a pointer and stipulate that AddData owns it.
I feel that rvalues do make ownership explicit. You can't pass a non-temporary rvalue without
move(X), which makes it really clear.

I think it's better to make it clear from the code (you cannot pass a TransmitDataCtx without
relinquishing ownership) than it is to use the comments to convey a convention.
File be/src/runtime/

PS3, Line 129: pending_senders_
> Maybe this could be future work, but I foresee a need to cap this at some n
Hm - why do you think failing the query's a good idea? If the receiver is slow relative to
the sender, this queue will grow, but that's not an error.

PS3, Line 137: SpinLock
> Isn't this a rather large amount of work to have under a spinlock?
Although it's quite a few lines, I don't think it's that much work. What operation looks expensive
to you?

PS3, Line 191: // num_remaining_senders_ could be 0 because an AddBatch() can arrive *after*
             :     // EndDataStream() RPC for the same sender, due to asynchrony on the sender
side (the
             :     // sender gets closed or cancelled, but doesn't wait for the oustanding
             :     // to complete before trying to close the channel).
> If this is the only case where num_remaining_senders_ can be 0, then is the
We have to clean up the RPC state and responding seems a good way to do that.
File be/src/runtime/data-stream-recvr.h:

PS3, Line 174: good_bytes_received_counter_
> bytes_accepted_counter_ ?
File be/src/runtime/descriptors.h:

Line 546:   /// Serialize to the row_tuples field of a RowBatchPb.
> something called ToProto should materialize a message that corresponds to t
File be/src/service/

Line 80:     ExecEnv::GetInstance()->stream_mgr()->AddData(finst_id, move(payload));
> This is a tad bit confusing. Why is there not a necessity to have a Respond
AddData() will always respond to the RPC, but may do so asynchronously, so we can't use a
return value here. 

If AddData() is not called, we have to respond to the RPC right here to make sure it doesn't
hang on the client. I added some comments.

PS3, Line 99: // TODO: Check return
> What he said.

Line 118:     context->GetInboundSidecar(filter.header.directory_sidecar_idx(), &;
> Same here, check return status.

Line 155:   context->RespondSuccess();
> No need to return status.SetTStatus(&return_val) ?
Good catch, done.
File be/src/service/

PS3, Line 1925: move
> Include what you use? <utility>
File be/src/service/impala_internal_service.proto:

Line 1: // Licensed to the Apache Software Foundation (ASF) under one
> let's move all .proto files into common/proto, that'll make it a lot easier
The tooling is set up so that proto files fit more neatly in the existing source directories,
and during development I've found it convenient to have the generated files in the same directory
as other source files. I'd rather keep them in the source dirs - finding them is not hard.

Line 32:   // Tuples for all rows
> incomprehensible comment.
Do you mean like this:

  message TupleId {
    int32 id = 1;

If so I don't quite see the benefit vs the extra verbosity.

Line 46:   // Of type CatalogObjects.THdfsCompression (TODO(KRPC): native enum)
> why not use the enum?
We use THdfsCompression here because that's used elsewhere in the code where it's harder to
change the serialization format (i.e. in Java). I could set up a mirror compression type in
protobufs that's used for this field specifically, but that would be brittle (got to make
sure the translation is done accurately from thrift<->proto and that no cases are missed).
I think it's better to wait until we can use a proto enum everywhere.

Line 50: message TransmitDataRequestPb {
> you left out the service version (example: ImpalaInternalServiceVersion in 
I think there are some details that are underspecified in our current implementation that
need some discussion - for example, how should version mismatches be indicated to the client?
KRPC has an out-of-band error mechanism that we can use. 

Alternatively, we can add versions to response objects, with the convention that if the replied
version != the requested version, no fields will be set and the RPC should fail. 

In Thrift only the request objects have a version field.

What should we do with messages that aren't parameters? Do they have version fields? Otherwise,
how should they be versioned? (Say we want to add a field to StatusPb). Do we have StatusPbV1

I think this warrants a bit more discussion. I've changed all the fields to 'optional' (that's
the only possibility in protobuf v3, out of interest). I think that gives us some future-proofing
while we get the details nailed down, since the absence of a protocol marker can be interpreted
as V0.

Line 115: service ExecControlService {
> why not put the services in separate .proto files

Line 131: service DataStreamService {
> would it make sense to have separate patches for the two services? it feels
I've started this, but it's a bit time-consuming moving thousands of lines of changes between
commits. For now, please review the data stream changes, and eventually the control svc changes
will move into a different patch.

Line 140:   // Called by the coordinator to deliver global runtime filters to fragment
> i consider the filters to be control structures. why wouldn't they be in ex
I don't think they are control structures. They have large payloads and contain tuple data.
File be/src/service/

PS3, Line 79: exec_env->Init();
> Need to check returned status and fail if necessary.

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <>
Gerrit-Reviewer: Marcel Kornacker <>
Gerrit-Reviewer: Sailesh Mukil <>
Gerrit-HasComments: Yes

View raw message