Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1D44C200D06 for ; Mon, 25 Sep 2017 16:49:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1B70D1609C4; Mon, 25 Sep 2017 14:49:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3A03E1609BB for ; Mon, 25 Sep 2017 16:49:44 +0200 (CEST) Received: (qmail 63898 invoked by uid 500); 25 Sep 2017 14:49:43 -0000 Mailing-List: contact reviews-help@impala.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@impala.incubator.apache.org Received: (qmail 63885 invoked by uid 99); 25 Sep 2017 14:49:43 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Sep 2017 14:49:43 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 9A09EDC018 for ; Mon, 25 Sep 2017 14:49:42 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.362 X-Spam-Level: ** X-Spam-Status: No, score=2.362 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=2, RDNS_DYNAMIC=0.363, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id YYlpuMnioPwp for ; Mon, 25 Sep 2017 14:49:41 +0000 (UTC) Received: from ip-10-146-233-104.ec2.internal (ec2-75-101-130-251.compute-1.amazonaws.com [75.101.130.251]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id A4BC25FC5D for ; Mon, 25 Sep 2017 14:49:40 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by ip-10-146-233-104.ec2.internal (8.14.4/8.14.4) with ESMTP id v8PEncSG022379; Mon, 25 Sep 2017 14:49:38 GMT Message-Id: <201709251449.v8PEncSG022379@ip-10-146-233-104.ec2.internal> X-Gerrit-PatchSet: 1 Date: Mon, 25 Sep 2017 14:49:37 +0000 From: "Sailesh Mukil (Code Review)" To: Michael Ho , impala-cr@cloudera.com, reviews@impala.incubator.apache.org X-Gerrit-MessageType: comment Subject: =?UTF-8?Q?=5BImpala-ASF-CR=5D_IMPALA-4856=3A_Port_data_stream_service_to_KRPC=0A?= X-Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 X-Gerrit-Change-Number: 8023 X-Gerrit-ChangeURL: X-Gerrit-Commit: 89a448d6e8dbb5666b50023edfc4271955e28af6 In-Reply-To: References: X-Gerrit-Comment-Date: Mon, 25 Sep 2017 14:49:37 +0000 Reply-To: sailesh@cloudera.com, impala-cr@cloudera.com, marcelk@gmail.com, kwho@cloudera.com, reviews@impala.incubator.apache.org MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.14.2 Content-Type: multipart/alternative; boundary="9tTaThHVX5c="; charset=UTF-8 archived-at: Mon, 25 Sep 2017 14:49:45 -0000 --9tTaThHVX5c= Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Sailesh Mukil has posted comments on this change=2E ( http://gerrit=2Ecloud= era=2Eorg:8080/8023 ) Change subject: IMPALA-4856: Port data stream servic= e to KRPC =2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E= =2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E= =2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E=2E = Patch Set 1: (14 comments) http://gerrit=2Ecloudera=2Eorg:8080/#/c/8023/1= /be/src/runtime/krpc-data-stream-mgr=2Eh File be/src/runtime/krpc-data-stre= am-mgr=2Eh: http://gerrit=2Ecloudera=2Eorg:8080/#/c/8023/1/be/src/runtime/= krpc-data-stream-mgr=2Eh@110 PS1, Line 110: sent nit: received http://ger= rit=2Ecloudera=2Eorg:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr=2E= h@110 PS1, Line 110: no TransmitData() RPCs will successfully deliver their= : /// payload=2E Why would there be a TransmitData() RPC if E= ndDataStream() has already been sent? Doesn't the sender send it only if it= knows all its TransmitData() RPCs have been processed? http://gerrit=2Ec= loudera=2Eorg:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr=2Eh@133 P= S1, Line 133: /// period expires=2E As per Tim's comment above, I would als= o reference IMPALA-3990 as a TODO here for later fixing=2E http://gerrit= =2Ecloudera=2Eorg:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr=2Eh@1= 59 PS1, Line 159: Consider tracking, on the sender, whether a batch has bee= n successfully sent or : /// not=2E That's enough state to r= ealise that a receiver has failed (rather than not : /// pre= pared yet), and the data stream mgr can use that to fail an RPC fast, rathe= r than : /// having the closed-stream list=2E It would be ni= ce to have a JIRA for this and reference it here=2E http://gerrit=2Ecloud= era=2Eorg:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr=2Eh@353 PS1, = Line 353: waiting_senders This is a little confusing to follow in the =2Ecc= file, since when I see "waiting_senders", I expect it to be a set of some = unique identifiers for a Sender ID=2E Although this is unique to a specifi= c sender, it would be a little clearer to call this 'waiting_senders_ctxs'= =2E Let me know what you think=2E http://gerrit=2Ecloudera=2Eorg:8080/#/= c/8023/1/be/src/runtime/krpc-data-stream-mgr=2Eh@356 PS1, Line 356: closed_= senders Similarly, we could call this 'closed_senders_ctxs'=2E http://ger= rit=2Ecloudera=2Eorg:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr=2E= cc File be/src/runtime/krpc-data-stream-mgr=2Ecc: http://gerrit=2Ecloudera= =2Eorg:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr=2Ecc@168 PS1, Li= ne 168: num_senders_waiting_->Increment(1); : tota= l_senders_waited_->Increment(1); : RecvrId recvr_id =3D = make_pair(fragment_instance_id, request->dest_node_id()); : = auto payload =3D : make_unique(= proto_batch, context, request, response); : early_sender= s_map_[recvr_id]=2Ewaiting_senders=2Epush_back(move(payload)); I'm wonderin= g if it makes sense to add simple inline functions that encapsulate this fu= nctionality; for the sake of readability=2E Eg: AddEarlyWaitingSender(), A= ddEarlyClosedSender() http://gerrit=2Ecloudera=2Eorg:8080/#/c/8023/1/be/s= rc/runtime/krpc-data-stream-mgr=2Ecc@213 PS1, Line 213: If no receiver foun= d, but not in the closed stream cache nit: If no receiver is found, and the= receiver is not in the closed stream cache as well, we still need=2E=2E=2E= http://gerrit=2Ecloudera=2Eorg:8080/#/c/8023/1/be/src/runtime/krpc-data-= stream-mgr=2Ecc@218 PS1, Line 218: RecvrId recvr_id =3D make_pair(fra= gment_instance_id, dest_node_id); : auto payload =3D mak= e_unique(context, request, response); : = early_senders_map_[recvr_id]=2Eclosed_senders=2Eemplace_back(move(payload= )); : num_senders_waiting_->Increment(1); := total_senders_waited_->Increment(1); AddEarlyClosedSender() as per c= omment above, if you agree=2E http://gerrit=2Ecloudera=2Eorg:8080/#/c/802= 3/1/be/src/runtime/krpc-data-stream-mgr=2Ecc@227 PS1, Line 227: if (LIKEL= Y(recvr !=3D nullptr)) recvr->RemoveSender(request->sender_id()); = : Status::OK()=2EToProto(response->mutable_status()); : = context->RespondSuccess(); This may need some modification based on the r= ecent commit for IMPALA-5199: https://github=2Ecom/apache/incubator-impala/= commit/5119ced50c0e0c4001621c9d4da598c187bdb580 http://gerrit=2Ecloudera= =2Eorg:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr=2Ecc File be/s= rc/runtime/krpc-data-stream-recvr=2Ecc: http://gerrit=2Ecloudera=2Eorg:808= 0/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr=2Ecc@75 PS1, Line 75: ("= new data") I'm having some trouble understanding what this means=2E Could y= ou please clarify? http://gerrit=2Ecloudera=2Eorg:8080/#/c/8023/1/be/src/= runtime/krpc-data-stream-recvr=2Ecc@271 PS1, Line 271: data_arrival_cv_=2En= otify_all(); Shouldn't this notify be done while holding the lock_ ? http= ://gerrit=2Ecloudera=2Eorg:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-= recvr=2Ecc@285 PS1, Line 285: while (!blocked_senders_=2Eempty()) { nit: = Add comment: Respond to blocked senders' RPCs http://gerrit=2Ecloudera=2E= org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-sender=2Ecc File be/src= /runtime/krpc-data-stream-sender=2Ecc: http://gerrit=2Ecloudera=2Eorg:8080= /#/c/8023/1/be/src/runtime/krpc-data-stream-sender=2Ecc@208 PS1, Line 208: = % Should we do a bitwise (cur_batch_idx_ + 1) & 1 instead? Or would the com= piler take care of that? -- To view, visit http://gerrit=2Ecloudera=2Eo= rg:8080/8023 To unsubscribe, visit http://gerrit=2Ecloudera=2Eorg:8080/sett= ings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: = comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-= Change-Number: 8023 Gerrit-PatchSet: 1 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-C= omment-Date: Mon, 25 Sep 2017 14:49:37 +0000 Gerrit-HasComments: Yes --9tTaThHVX5c=--