Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 58A39200D45 for ; Thu, 9 Nov 2017 01:31:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 570C7160C01; Thu, 9 Nov 2017 00:31:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 75874160BDA for ; Thu, 9 Nov 2017 01:31:47 +0100 (CET) Received: (qmail 34316 invoked by uid 500); 9 Nov 2017 00:31:46 -0000 Mailing-List: contact reviews-help@impala.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@impala.incubator.apache.org Received: (qmail 34305 invoked by uid 99); 9 Nov 2017 00:31:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Nov 2017 00:31:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 937E6180740 for ; Thu, 9 Nov 2017 00:31:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.363 X-Spam-Level: ** X-Spam-Status: No, score=2.363 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=2, RDNS_DYNAMIC=0.363, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id qNLg3c5Eyzy2 for ; Thu, 9 Nov 2017 00:31:43 +0000 (UTC) Received: from ip-10-146-233-104.ec2.internal (ec2-75-101-130-251.compute-1.amazonaws.com [75.101.130.251]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id E8E9A5FBEE for ; Thu, 9 Nov 2017 00:31:42 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by ip-10-146-233-104.ec2.internal (8.14.4/8.14.4) with ESMTP id vA90VfWj026152; Thu, 9 Nov 2017 00:31:41 GMT Message-Id: <201711090031.vA90VfWj026152@ip-10-146-233-104.ec2.internal> X-Gerrit-PatchSet: 13 Date: Thu, 9 Nov 2017 00:31:41 +0000 From: "Michael Ho (Code Review)" To: Sailesh Mukil , Mostafa Mokhtar , Dan Hecht , impala-cr@cloudera.com, reviews@impala.incubator.apache.org X-Gerrit-MessageType: newpatchset Subject: =?UTF-8?Q?=5BImpala-ASF-CR=5D_IMPALA-4856=3A_Port_data_stream_service_to_KRPC=0A?= X-Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 X-Gerrit-Change-Number: 8023 X-Gerrit-ChangeURL: X-Gerrit-Commit: 1dea658154eac0cb6ffac62042f6c385434fbdde In-Reply-To: References: Reply-To: kwho@cloudera.com, impala-cr@cloudera.com, sailesh@cloudera.com, marcelk@gmail.com, dhecht@cloudera.com, mmokhtar@cloudera.com, reviews@impala.incubator.apache.org MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.14.2 Content-Type: multipart/alternative; boundary="bqkboDOwx0E="; charset=UTF-8 archived-at: Thu, 09 Nov 2017 00:31:48 -0000 --bqkboDOwx0E= Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, I'd like you to reexamin= e a change=2E Please visit http://gerrit=2Ecloudera=2Eorg:8080/8023 t= o look at the new patch set (#13)=2E Change subject: IMPALA-4856: Port dat= a stream service to KRPC =2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E= =2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E= =2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E= =2E=2E=2E=2E IMPALA-4856: Port data stream service to KRPC This patch imp= lements a new data stream service which utilizes KRPC=2E Similar to the thr= ift RPC implementation, there are 3 major components to the data stream ser= vices: KrpcDataStreamSender serializes and sends row batches materialized b= y a fragment instance to a KrpcDataStreamRecvr=2E KrpcDataStreamMgr is resp= onsible for routing an incoming row batch to the appropriate receiver=2E Th= e data stream service runs on the port FLAGS_krpc_port which is 29000 by de= fault=2E Unlike the implementation with thrift RPC, KRPC provides an async= hronous interface for invoking remote methods=2E As a result, KrpcDataStrea= mSender doesn't need to create a thread per connection=2E There is one conn= ection between two Impalad nodes for each direction (i=2Ee=2E client and se= rver)=2E Multiple queries can multi-plex on the same connection for transmi= tting row batches between two Impalad nodes=2E The asynchronous interface a= lso prevents avoids the possibility that a thread is stuck in the RPC code = for extended amount of time without checking for cancellation=2E A Transmit= Data() call with KRPC is in essence a trio of RpcController, a serialized p= rotobuf request buffer and a protobuf response buffer=2E The call is invoke= d via a DataStreamService proxy object=2E The serialized tuple offsets and = row batches are sent via "sidecars" in KRPC to avoid extra copy into the se= rialized request buffer=2E Each impalad node creates a singleton DataStrea= mService object at start-up time=2E All incoming calls are served by a serv= ice thread pool created as part of DataStreamService=2E By default, the num= ber of service threads equals the number of logical cores=2E The service th= reads are shared across all queries so the RPC handler should avoid blockin= g as much as possible=2E In thrift RPC implementation, we make a thrift thr= ead handling a TransmitData() RPC to block for extended period of time when= the receiver is not yet created when the call arrives=2E In KRPC implement= ation, we store TransmitData() or EndDataStream() requests which arrive bef= ore the receiver is ready in a per-receiver early sender list stored in Krp= cDataStreamMgr=2E These RPC calls will be processed and responded to when t= he receiver is created or when timeout occurs=2E Similarly, there is limit= ed space in the sender queues in KrpcDataStreamRecvr=2E If adding a row bat= ch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the= request will be stashed in a queue for deferred processing=2E The stashed = RPC requests will not be responded to until they are processed so as to exe= rt back pressure to the senders=2E An alternative would be to reply with an= error and the request / row batches need to be sent again=2E This may end = up consuming more network bandwidth than the thrift RPC implementation=2E T= his change adopts the behavior of allowing one stashed request per sender= =2E All rpc requests and responses are serialized using protobuf=2E The eq= uivalent of TRowBatch would be ProtoRowBatch which contains a serialized he= ader about the meta-data of the row batch and two Kudu Slice objects which = contain pointers to the actual data (i=2Ee=2E tuple offsets and tuple data)= =2E This patch is based on an abandoned patch by Henry Robinson=2E TESTIN= G ------- * Builds {exhaustive/debug, core/release, asan} passed with FLAG= S_use_krpc=3Dtrue=2E TO DO ----- * Port some BE tests to KRPC services=2E= Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/= status=2Ecc M be/src/common/status=2Eh M be/src/exec/data-sink=2Ecc M be/sr= c/exec/exchange-node=2Ecc M be/src/exec/kudu-util=2Eh M be/src/rpc/CMakeLis= ts=2Etxt M be/src/rpc/rpc-mgr=2Ecc M be/src/rpc/rpc-mgr=2Eh M be/src/runtim= e/CMakeLists=2Etxt M be/src/runtime/data-stream-mgr-base=2Eh M be/src/runti= me/data-stream-mgr=2Eh M be/src/runtime/data-stream-recvr=2Eh M be/src/runt= ime/data-stream-sender=2Eh M be/src/runtime/exec-env=2Ecc M be/src/runtime/= exec-env=2Eh M be/src/runtime/krpc-data-stream-mgr=2Ecc M be/src/runtime/kr= pc-data-stream-mgr=2Eh M be/src/runtime/krpc-data-stream-recvr=2Ecc M be/sr= c/runtime/krpc-data-stream-recvr=2Eh A be/src/runtime/krpc-data-stream-send= er=2Ecc A be/src/runtime/krpc-data-stream-sender=2Eh M be/src/runtime/row-b= atch=2Ecc M be/src/runtime/row-batch=2Eh M be/src/service/CMakeLists=2Etxt = A be/src/service/data-stream-service=2Ecc A be/src/service/data-stream-serv= ice=2Eh M be/src/service/impala-server=2Ecc M cmake_modules/FindProtobuf=2E= cmake M common/protobuf/CMakeLists=2Etxt A common/protobuf/common=2Eproto A= common/protobuf/data_stream_service=2Eproto A common/protobuf/row_batch=2E= proto M common/thrift/generate_error_codes=2Epy 33 files changed, 3,155 ins= ertions(+), 184 deletions(-) git pull ssh://gerrit=2Ecloudera=2Eorg:294= 18/Impala-ASF refs/changes/23/8023/13 -- To view, visit http://gerrit=2Ecl= oudera=2Eorg:8080/8023 To unsubscribe, visit http://gerrit=2Ecloudera=2Eorg= :8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Mes= sageType: newpatchset Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b32= 3ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 13 Gerrit-Owner: Michael= Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostaf= a Mokhtar Gerrit-Reviewer: Sailesh Mukil --bqkboDOwx0E=--