impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Henry Robinson (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-{4670,4672,4784}: Add RpcMgr and port Statestore services to KRPC
Date Fri, 27 Jan 2017 22:17:52 GMT
Henry Robinson has posted comments on this change.

Change subject: IMPALA-{4670,4672,4784}: Add RpcMgr and port Statestore services to KRPC

Patch Set 3:


Although it might look like a lot has changed in this new patch, there are only two relatively
minor big changes:

1. Rewrite in C++ (see to make sure we don't lose any
of that coverage.

2. Remove InProcessStatestore which was unused and causing pain to keep up-to-date.
File be/src/rpc/common.proto:

Line 27: message ThriftWrapperPB { required bytes thrift_struct = 1; }
> a brief comment regarding the purpose would be good.
File be/src/rpc/

Line 44: DEFINE_int32(num_acceptor_threads, 2, "Number of threads dedicated to accepting "
> why so low?
Acceptor threads don't do much (just create a socket and pass it to the reactor threads).
This is unlike Thrift where the acceptor thread also did the SASL negotiation, which was expensive
and harmed throughput. So really this is double the number of Thrift threads, and each will
have less work to do per cnxn.

We expect the number of connections to be much lower with KRPC as well.

PS3, Line 52: 4
> Yeah, this should be plumbed through, will do that.

Line 59:   DCHECK(is_inited()) << "Must call Init() before RegisterService()";
> dcheck that services haven't been started yet, or does that not matter?

Line 62:       new ServicePool(move(scoped_ptr), messenger_->metric_entity(), service_queue_depth);
> why not just pass service_ptr.release()?
I can't quite do that because the gscoped_ptr<T>(T*) c'tor is explicit. I can do:

  new ServicePool(gscoped_ptr<ServiceIf>(service_ptr.release())

PS3, Line 64: service_pool->Init
> good point. if the init call fails, do we abort impalad startup? (i would a
yes indeed (and statestore startup).

PS3, Line 74: int32_t num_acceptor_threads
> so then move the flag to where it's used?
it's used in several places (anywhere that has an RpcMgr might want to use the flag). So I
think the DEFINE(..) here, DECLARE(...) in modules that want to use it is the right pattern.
File be/src/rpc/rpc-mgr.h:

PS3, Line 41: 0
> what does it mean to manage 0 services?
It means that there are 0 services managed by the RpcMgr? Since the RpcMgr also helps get
proxies, you don't have to register a service for it to be useful.

PS3, Line 49: CLIENTS
> why define new terminology? isn't this just "proxy" in KuduRPC speak?
I've tried to standardize a bit better. 'clients' and 'servers' are pretty established nouns
in this space though.

PS3, Line 52: GetProxy
> so I guess RpcMgr deals with both the server and client side of things?  As
See line 36: "Central manager for all RPC services and clients".

A service is a network-exported set of methods that can be called using the KRPC protocol.
Speaking precisely, a service 'definition' is the set of methods (defined in a protobuf),
but a service is the instantiation of that service definition on a particular machine. It's
a server-side only construct. 

A proxy allows clients to easily call methods on one service, and so they share the same set
of method signatures (i.e. they refer to the same service definition). 

Typically a proxy and a service are used from different machines, and KRPC does not check
that the remote service has been started before creating a proxy object. Connectivity / service
availability checks are performed when the actual RPC method is invoked.

Line 64: /// RpcMgr::RegisterService() before RpcMgr::StartServices() is called.
> does init() go before register()?

Line 111:   /// Creates a new proxy for a remote service of type P at location 'address',
and places
> might want to point out here what P needs to be (auto-generated from a serv
File be/src/rpc/rpc-mgr.inline.h:

Line 39:       kudu::HostPort(address.hostname, address.port).ResolveAddresses(&addresses),
> how expensive is this?
It's a DNS subsystem call. 

Without the nscd (nameservice caching daemon) running, the mean response time was ~5ms (although
the 99.9th percentile was lower - there are larger timeout events that are expensive).

However, nscd brings the mean response time down to about 10us per invocation. We (Cloudera)
recommend that users have nscd running for CDH deployments; we should make sure that that
is emphasised for Impala as well (it would make a difference no matter if KRPC was used or

nscd also seems to reduce the frequency of outliers as well.

Line 41:   proxy->reset(new P(messenger_, addresses[0]));
> dcheck that addresses isn't empty?
File be/src/rpc/rpc.h:

PS3, Line 36: Clients
> over in rpc-mgr.h, you seemed to use "client" as a synonym for proxy, but w
I've reworded rpc-mgr.h to standardize on Proxy. I'll write 'callers' here to avoid ambiguity.

'A single RPC instance' is a bit confusing, I've reworded. But it's intended to mean a single
invocation of a remote method. The point at which you consider a method 'invoked' seems to
be causing some confusion: here I mean to be the point at which application logic actually
gets called on the server side.

PS3, Line 48: Rpc
> I was confused about this class at first since the name sounds like it's th
Retry semantics are often application-specific. For example, Kudu have different retry requirements
than us - for example, they want to fail over to different replicas. (They do have a retry
object, but it's very Kudu-specific).

I don't think it would help to call this a Wrapper class. From the perspective of the caller,
this class presents the abstraction of an RPC: the caller invokes a method (via Execute()),
and gets a response.

Line 51:   /// 'RESP' (must both be protobuf).
> unclear what req and resp refer to
Outdated comment, done.

Line 58:   Rpc(const TNetworkAddress& remote, RpcMgr* mgr) : remote_(remote), mgr_(mgr)
> does this need to be public?

Line 68:   Rpc& SetMaxAttempts(int32_t max_attempts) {
> does this need to be specific (int32_t as opposed to int)?
It doesn't have to be, but I thought in general we tried to be explicit. Can revert it to
an int if preferred.

PS3, Line 90: func
> point out in what way F is constrained

Line 91:     if (max_rpc_attempts_ <= 1) {
> why not dcheck in set..()? there's no reason this should be a runtime error
Typo - should be <, not <=.

Re: DCHECK vs runtime error - I can see these parameters being controlled by flags (e.g the
timeout for the data stream sender), so they will need to have runtime checks.

Line 112:       if (!controller.status().IsRemoteError()) break;
> comment on significance of remote error. looks like you're getting a non-ok
Rewritten this to make it a bit clearer. I've introduced IsRetryableError() (which is also
needed in a subsequent patch) to abstract the logic to decide if a retry is needed.

Line 143:   /// TODO(KRPC): Warn on uninitialized timeout, or set meaningful default.
> why warn? if you don't set a timeout, it shouldn't time out.
I think we should just set a default. It's now a *really* bad idea to have RPCs without timeouts,
because of the limited set of service threads.
File be/src/rpc/rpc_test.proto:

Line 18: package impala;
> add comment describing what this is used for
File be/src/runtime/exec-env.h:

Line 159:   TNetworkAddress subscriber_address_;
> same address as the generic backend service, meaning it'll be removed once 
Yes - this will go away.
File be/src/statestore/

Line 77: class StatestoreSubscriberImpl : public StatestoreSubscriberIf {
> calling it 'impl' isn't super-meaningful (impl of what?). would be nice to 
How about just 'service'? ServiceImpl is a bit clunky, and there aren't going to be multiple
implementations of the *If class.

Line 207:       return Status("--statestore_subscriber_num_svc_threads must be at least 2");
> why not just bump it up to 2?
I prefer to avoid surprise by changing flags if they're set wrong. I suspect no-one's going
to set this lower.
File be/src/statestore/statestore-subscriber.h:

Line 128:   TNetworkAddress subscriber_address_;
> wouldn't that be the same as the generic backend address?
Yes - when the ImpalaInternalService is ported to KRPC, this should go away.

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: I8dbf12b9ecd71d26d239d31c19b487175194c766
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <>
Gerrit-Reviewer: Dan Hecht <>
Gerrit-Reviewer: David Knupp <>
Gerrit-Reviewer: Henry Robinson <>
Gerrit-Reviewer: Juan Yu <>
Gerrit-Reviewer: Marcel Kornacker <>
Gerrit-Reviewer: Sailesh Mukil <>
Gerrit-Reviewer: Taras Bobrovytsky <>
Gerrit-HasComments: Yes

View raw message