impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Ho (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Date Thu, 09 Nov 2017 00:31:41 GMT
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, 

I'd like you to reexamine a change. Please visit

to look at the new patch set (#13).

Change subject: IMPALA-4856: Port data stream service to KRPC

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents avoids the possibility that a thread is stuck in the RPC code
for extended amount of time without checking for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, the number of service threads equals the
number of logical cores. The service threads are shared across all queries so
the RPC handler should avoid blocking as much as possible. In thrift RPC
implementation, we make a thrift thread handling a TransmitData() RPC to block
for extended period of time when the receiver is not yet created when the call
arrives. In KRPC implementation, we store TransmitData() or EndDataStream()
requests which arrive before the receiver is ready in a per-receiver early
sender list stored in KrpcDataStreamMgr. These RPC calls will be processed
and responded to when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a queue for deferred processing.
The stashed RPC requests will not be responded to until they are processed
so as to exert back pressure to the senders. An alternative would be to reply with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.


* Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true.


* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
M be/src/common/
M be/src/common/status.h
M be/src/exec/
M be/src/exec/
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/
M be/src/runtime/exec-env.h
M be/src/runtime/
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/
A be/src/service/data-stream-service.h
M be/src/service/
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/
33 files changed, 3,155 insertions(+), 184 deletions(-)

  git pull ssh:// refs/changes/23/8023/13
To view, visit
To unsubscribe, visit

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 13
Gerrit-Owner: Michael Ho <>
Gerrit-Reviewer: Dan Hecht <>
Gerrit-Reviewer: Michael Ho <>
Gerrit-Reviewer: Mostafa Mokhtar <>
Gerrit-Reviewer: Sailesh Mukil <>

  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message