impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Ho (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Date Fri, 27 Oct 2017 18:28:29 GMT
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, 

I'd like you to reexamine a change. Please visit

to look at the new patch set (#5).

Change subject: IMPALA-4856: Port data stream service to KRPC

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents some issues with thrift RPC in which a thread may be stuck in
an RPC call without being able to check for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, there are 64 service threads. The service
threads are shared across all queries so the RPC handler should avoid
blocking as much as possible. In thrift RPC implementation, we make a thrift
thread handling a TransmitData() RPC to block for extended period of time
when the receiver is not yet created when the call arrives. In KRPC
implementation, we store TransmitData() or EndDataStream() requests
which arrive before the receiver is ready in a per-receiver early sender list
stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to
when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a blocked_sender_ queue to be processed
later. The stashed RPC request will not be responded to until it is processed
so as to exert back pressure to the client. An alternative would be to reply with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.


* Build passed with FLAGS_use_krpc=true.


* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
M be/src/common/
M be/src/common/status.h
M be/src/exec/
M be/src/exec/
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/
M be/src/runtime/exec-env.h
M be/src/runtime/
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/
A be/src/service/data-stream-service.h
M be/src/service/
M be/src/util/
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/
34 files changed, 2,929 insertions(+), 175 deletions(-)

  git pull ssh:// refs/changes/23/8023/5
To view, visit
To unsubscribe, visit

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 5
Gerrit-Owner: Michael Ho <>
Gerrit-Reviewer: Dan Hecht <>
Gerrit-Reviewer: Michael Ho <>
Gerrit-Reviewer: Mostafa Mokhtar <>
Gerrit-Reviewer: Sailesh Mukil <>

  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message