impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Henry Robinson (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Date Thu, 22 Jun 2017 00:02:02 GMT
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC

Patch Set 5:


PS4 is a rebase. PS5 includes the review responses (so diff 4->5 if you want to see what
File be/src/rpc/rpc.h:

PS3, Line 106: Ownership is
             :   // shared by the caller, and the RPC subsystem
> Doesn't std::move transfer the ownership so the caller no longer shares the
The shared_ptr is copied in. It's the copy that is then moved into the sidecar list.

PS3, Line 143: are owned by the caller
> the ownership is temporarily transferred to the RPC call when this function
I don't think so - the RPC call has pointers, but doesn't have ownership in the sense that
it has no responsibility for managing a reference count or freeing the memory.

PS3, Line 147: 
> Having the names 'func', 'cb' and 'cb_wrapper' all close by each other make

PS3, Line 153: 
> Does this move mean that the params_ member is invalid after this call? If 

PS3, Line 327: 
> Maybe name this 'completion_cb'.
File be/src/runtime/

PS3, Line 389: equest->mutable_bloom_filter()->set_log_heap_space(0);
             :     request->mutable_bloom_filter()->set_directory_sidecar_idx(-1);
             :   }
> Why wouldn't a move capture ensure the same thing?
proto_filter is a const shared_ptr&. You can't move from it. Instead, we could have the
argument be shared_ptr<ProtoBloomFilter>, and move from it here; they're basically equivalent,
it's just a question of where you make the copy.
File be/src/runtime/

PS3, Line 1207:       VLOG_QUERY << "Not enough memory to allocate filter: "
              :                  << PrettyPrinter::Print(heap_space, TUnit::BYTES)
              :                  << " (query: " << coord->query_id() <<
              :       // Disable, as one missing update means a correct filter cannot be 
> I would add this to the commit message. This means we would take double the
I don't think so - because is a sidecar I don't think it's been copied since
it arrived on the socket. In the Thrift case, the bytestream had to be deserialized into a
TBloomFilter. That's what's happening here - the equivalent 'deserialization' step.

This path should only get taken the first time a filter arrives, and it does briefly keep
two filters around (the sidecar should get destroyed as soon as the RPC is responded to).
File be/src/runtime/

PS3, Line 280: blocked_senders_.front()
> Is this a right way to dispose a unique_ptr?
Good point - release() is clearer, and get() may have been a benign bug.
File be/src/runtime/

PS3, Line 58: 
> Not used.

PS3, Line 60: 
> Not used.

PS3, Line 133: scoped_ptr<RowBatch> batch_;
> No one really calls GetNumDataBytesSent() (except from our BE test). So, I'
We're gaining correctness - so worth doing (otherwise if someone decides to use it in the
future, they might run into problems).

PS3, Line 148: 
> A reader of this code might not immediately understand why this class needs
I expanded the comment here.

PS3, Line 170: 
> Why is this set in Init()? Wouldn't it ideally be set it in the constructor
Moved to c'tor.

PS3, Line 175: proto_batch_idx_
> Just want to make sure that this will increase the shared_ptr refcount? It 
Yep - this was a mistake. Removed auto to make it more explicit.

PS3, Line 203: co
> Prefer a more descriptive name "rpc_completion_cb" or something similar.

PS3, Line 214: ck_guard
> channel == nullptr

PS3, Line 252: batch->tuple_data, &idx);
> Is this transferring the ownership to the RPC subsystem ? AddSideCar() inte
The ownership is shared with the batch object. AddSidecar() internally moves from the argument,
which is a copy (i.e. its own reference).

PS3, Line 266: .release(), rpc_complete_callback);
> This is a subtle change in behavior from previous Impala version. In partic
Any reasonably conservative timeout runs the risk of false negatives if a sender is blocked.

I agree with your analysis about this being a change in behaviour. In practice, though, here's
what I hope will happen: if one write to a node is slow enough to previously trigger the timeout,
I would expect the statestore RPCs to also go slow (and they will time out); the node will
be marked as offline and the query will be cancelled. 

If there is a situation where this RPC only is slow in writing (but all other RPCs to the
server are ok), then I agree this query will not timeout where it would previously. The query
is still cancellable in that case. My reading of the current implementation is that the query
is not cancellable if it becomes blocked in send(), which is one of the reasons the send timeout

This is something else that rpc cancellation will help with.
File be/src/runtime/

PS3, Line 117:     rei

Line 216:       output_batch->compressed_tuple_data->resize(compressed_size);
> Do you think it would be good to add a comment why we don't free the 'tuple
File be/src/runtime/row-batch.h:

PS3, Line 73: != THdfsCompression::NON
> != THdfsCompression::NONE

PS3, Line 441: FlushMode flush_ = FlushMode::NO_FLUSH_RESOURCES
> Why not initialize this and the other args below with the default values in
I believe it's clearer to initialize them at the point of declaration. It also saves duplication
between > 1 c'tor.

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 5
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <>
Gerrit-Reviewer: Henry Robinson <>
Gerrit-Reviewer: Michael Ho <>
Gerrit-Reviewer: Sailesh Mukil <>
Gerrit-HasComments: Yes

View raw message