Return-Path: X-Original-To: apmail-drill-dev-archive@www.apache.org Delivered-To: apmail-drill-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3F23518DEB for ; Sun, 13 Dec 2015 17:16:03 +0000 (UTC) Received: (qmail 72434 invoked by uid 500); 13 Dec 2015 17:16:03 -0000 Delivered-To: apmail-drill-dev-archive@drill.apache.org Received: (qmail 72381 invoked by uid 500); 13 Dec 2015 17:16:03 -0000 Mailing-List: contact dev-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@drill.apache.org Delivered-To: mailing list dev@drill.apache.org Received: (qmail 72369 invoked by uid 99); 13 Dec 2015 17:16:02 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 13 Dec 2015 17:16:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 47F1D1A0176 for ; Sun, 13 Dec 2015 17:16:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.001 X-Spam-Level: *** X-Spam-Status: No, score=3.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=dremio-com.20150623.gappssmtp.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id dCzF0BxAh-Nv for ; Sun, 13 Dec 2015 17:15:49 +0000 (UTC) Received: from mail-wm0-f53.google.com (mail-wm0-f53.google.com [74.125.82.53]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 6DC2F20BC2 for ; Sun, 13 Dec 2015 17:15:48 +0000 (UTC) Received: by wmpp66 with SMTP id p66so32914518wmp.1 for ; Sun, 13 Dec 2015 09:15:42 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=dremio-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=0Q22j7GNEpRkrPW27iQ1p644fb85q1AVoYWXQiBgiGM=; b=BHEjk6GYTcJvApYVzDQnXegvLRulyLoTHVpf1+4WNsbW4daQLY1ojBLlyf4tJ8JSWk ZWRlTEXPvA/zhVnlhqgVYwLII7HoTlYFhc6nnYI9PlOFURx2efUti8opNaBEQev1LFF7 mDcW4fbH3wL2W9EQ+i449atnIqOca6HbvfL1FoDkT+2jAsQfuDGtYrhHTChUhivlYHuC Q54LKTyjGmq5kuFuMjtPG9Uh42oK6vS/a3XX+7YNH2UMZxl6gDYRQYCHnHbmsWHChMFm PRkIgxkfiJeb3gyxT80lfsJAEmrLS0/ZMwE9zGhvDVbS2XwCNDS4JVSKEuCZOehO12ub xDLA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=0Q22j7GNEpRkrPW27iQ1p644fb85q1AVoYWXQiBgiGM=; b=YJO0koj7b4xwKCqp3z7ycS5qXgXIUiIjaQKHdfNz0iIaAL6sylEHNIXR+YbJT6tiYv 8T84EXoLIWis2rWs0WD9JBYJFfaxp1kKbOYykltc0/ewAyORXqOr1YUXpAFmdVY2HrTu 0aJX3Shg2Rm9t847GnQTmB7v0h42/us4jHuvgUKKaDAwIDo7pgnDjFx2ogTK0/9js/tx mh8ymoTggFXRHS7b/h20afM4tKyylvR+b0VyDl1peqvqgQo5pdnpm5UvghSWKVQARLYB bzw73ykmOH6ycvYkuEL+VWYs8rIe2R8kMvVhhoAWe/i3d0BAlWoJ68UIlxBLFX0G722i vYCQ== X-Gm-Message-State: ALoCoQmr+TgIYHleG9wW15neQdHk8pZU42wwTs75Z+To5MH7JtqKbdrIdWsU7GwPwPNRuC5W0RAxhqBnPrkDLoXZLwq21PEqvA== MIME-Version: 1.0 X-Received: by 10.28.194.133 with SMTP id s127mr19964150wmf.53.1450026942107; Sun, 13 Dec 2015 09:15:42 -0800 (PST) Received: by 10.27.205.215 with HTTP; Sun, 13 Dec 2015 09:15:41 -0800 (PST) In-Reply-To: References: <9298D988-1BDF-4F96-A323-069DBEF3553F@apache.org> Date: Sun, 13 Dec 2015 09:15:41 -0800 Message-ID: Subject: Re: Can we pass the #skipped records with RecordBatch? From: Jacques Nadeau To: dev Content-Type: multipart/alternative; boundary=001a1148ce1cccb9870526cab44b --001a1148ce1cccb9870526cab44b Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable You seem to be mixing multiple things in your response. - Why do you say this complex? It is very simple. Is it because you don't know how it would be implemented? I'm offering to do the vast majority of the work to implement the framework so you shouldn't use that as a gauge. - It is designed to provide for multiple different use cases, not just your own. As such, you should expect it to be more general. There is clearly a need to provide these messages in direction other than straight up the operator tree. There is also a need to provide sideband messages outside the context of a record batch. (We shouldn't be creating fake empty record batches just to send sideband messages, that caused us problems before on the UserRpc and I think we should compound purposes.) - You should evaluate whether it would solve the use case you presented. I believe it will. As far as your proposed implementation goes: I think you are confounding communication with the user with traversal of the operator tree. I would assume that each operator may be able to skip records. When you accumulate that information, you would want to know how much skip there were for each operator. The info might look like: skips: [ { op: 1:1:1, records: [123,456,789]} { op: 1:2:1, records: [123,456,789]} { op: 1:1:2, records: [123,456,789]} { op: 1:2:2, records: [123,456,789]} ] In this case, there is no need for operator 1:1:1 to know about operator 1:1:2's skips. It shouldn't even need to manage or move that data. So I believe your requirements are actually to provide a stream of skip records to a separate writer that should be on the edge of the plan. The more I talk through this, I'm wondering if sideband messages should take the same shape as a separate record batch and that we need to provide a separate subtree/fragment for this purpose. Sideband in that case would be a tee in the plan. For example, imagine this tree: https://docs.google.com/drawings/d/19w7lbpnajsmQPUqzxlb2JP2jr6MsGU9RDOmQO1N= 05uU/edit As you can see, I believe that the vast majority of the issues that you want to manage with your skip record design can be managed by providing a couple of simple tools: sideband, sideband sink operator (basically a custom version of the union receiver), and an enhancement to the Screen operator to support a secondary incoming stream with a defined schema that will be transformed into a set of warnings (this also allows fine grained warnings or use an aggregate in the secondary tree for aggregate warnings). The key goal here is trying to avoid the introduction of a new or more complicated interfaces at the execution layer and instead use the logical layer to manage things. I believe this also extends to the concept of $recordIdentifier (or similar). This should simply be a virtual field produced by all record readers (when requested) that includes relevant provenance information. If you want to know which records are problematic, ask for the identifier and then record in a separate file. Basically, let's use the highly efficient infrastructure we already have to do new things rather than implementing a new set of classes and concepts. -- Jacques Nadeau CTO and Co-Founder, Dremio On Fri, Dec 11, 2015 at 1:16 PM, Hsuan Yi Chu wrote: > The design scope is very general, but, for the applications we are thinki= ng > about now, this is a bit complex and will make the solutions a little bit > indirect. Especially, this one "data to be sent between any two > three-coordinate locations" implies sideband data goes in teleport? This = is > a bit too involving. And even for advanced pushdown, it is not necessary = to > be that flexible for communications. > > My original picture of "sideband" is that the additional information shou= ld > be "associated with" RecordBatch. That means this additional information > should be attached to a particular RecordBatch and cannot run on their ow= n. > > As the RecordBatch flows from upstream to downstream, the operator can > optionally access or update the sideband message. > For example, in the application of record-skipping, operator can see how > many records were skipped so far and increment the count if more are > skipped. > > If we go with this design, the place we need to change is on the receiver > side, which needs to decode the sideband info from the incoming buffers. > > On Tue, Dec 8, 2015 at 7:10 PM, Jacques Nadeau wrote= : > > > inline > > > > It seems that SidebandTunnel is point-to-point. That is, there is one > > > producer and one consumer. No broadcast or topics (multiple consumers > of > > > the same message). Order is preserved. At-most-once (i.e. may lose da= ta > > in > > > event of failure). Producer and consumer may be on the same node or > > > different nodes. Correct? > > > > > > > Yes, you are correct in all of this. Since we don't use UDP in Drill, w= e > do > > broadcast as a collection of individual p2p calls, all using the same > > message (and multiple reference counts if using raw bytes). > > > > > > > > > > I=E2=80=99m not sure SidebandTunnel.close is necessary. I would presu= me that a > > > SidebandTunnel is closed when its associated statement is closed, and > > only > > > then. > > > > > > > I started without it. My thought was that we may need to signal that > you've > > gotten all of a sideband stream prior to the close of a particular > > fragment. If I'm on the downside of an operation reporting multiple > skips, > > I may want to hold off on reporting to the user until I got all of the > > messages. One option is for the sender to send a discrete message via t= he > > Tunnel close. The other option is a implicit message when the fragment = is > > completed. I like the latter from a cleanliness perspective but think t= he > > former may be required. I'm ok for not exposing at the tunnel level > > publically initially and we can always expose later. I would love to he= ar > > whether people think there is going to be a need/use case to continue > > fragment operation but have another operator know that a sideband strea= m > is > > complete. Maybe when sending a downstream set of samples on the first 1= mm > > records of a larger scan? > > > > > > > Also, would it be easier if the tunnels were defined as part of the > DAG, > > > and DAG initialization time was the only time that they could be > created? > > > > > > > That is a really good question. I need to think about it a bit. I'm not > > sure it is easier given my initial proposal is to piggy-back on the > > DataTunnel, (which is independent of DAG initialization). However, it > > might be cleaner if operators have to declare this relationship at > > initialization time and it is all managed 'outside'. > > > > Thanks for the feedback. Will need to think further on your last point > > especially. > > > > > > > > > > Julian > > > > > > > > > > On Dec 8, 2015, at 11:00 AM, Jacques Nadeau > > wrote: > > > > > > > > Please see some initial thoughts attached. Would love feedback and > > > thoughts > > > > from others on how we can shape this. > > > > > > > > https://gist.github.com/jacques-n/84b13e704e0e3829ca99 > > > > > > > > -- > > > > Jacques Nadeau > > > > CTO and Co-Founder, Dremio > > > > > > > > On Thu, Dec 3, 2015 at 8:17 AM, Zelaine Fong > > wrote: > > > > > > > >> Yes, it would be great to get your thoughts so we can assess the > scope > > > of > > > >> what's involved. > > > >> > > > >> Thanks. > > > >> > > > >> -- Zelaine > > > >> > > > >> On Wed, Dec 2, 2015 at 7:29 PM, Jacques Nadeau > > > wrote: > > > >> > > > >>> Definitely agree that we shouldn't boil the ocean. That said, I > > don't > > > >>> think we should make RecordBatch interface changes without > deliberate > > > >>> design. Same for RPC protocol changes. Part of my internal strugg= le > > > with > > > >>> the warning patch is exactly this lack of broader design. I think > > this > > > is > > > >>> especially true given the drive to supports backwards > compatibility. > > > >>> > > > >>> I don't think we're talking about a massive undertaking. I'll try > to > > > >> write > > > >>> up some thoughts later this week to get the ball rolling. Sound > good? > > > >>> > > > >>> -- > > > >>> Jacques Nadeau > > > >>> CTO and Co-Founder, Dremio > > > >>> +1 on having a framework. > > > >>> OTOH, as with the warnings implementation, we might want to go > ahead > > > >> with a > > > >>> simpler implementation while we get a more generic framework desi= gn > > in > > > >>> place. > > > >>> > > > >>> Jacques, do you have any preliminary thoughts on the framework? > > > >>> > > > >>> On Tue, Dec 1, 2015 at 2:08 PM, Julian Hyde > > wrote: > > > >>> > > > >>>> +1 for a sideband mechanism. > > > >>>> > > > >>>> Sideband can also allow correlated restart of sub-queries. > > > >>>> > > > >>>> In sideband use cases you described, the messages ran in the > > opposite > > > >>>> direction to the data. Would the sideband also run in the same > > > >> direction > > > >>> as > > > >>>> the data? If so it could carry warnings, rejected rows, progress > > > >>>> indications, and (for online aggregation[1]) notifications that = a > > > >> better > > > >>>> approximate query result is available. > > > >>>> > > > >>>> Julian > > > >>>> > > > >>>> [1] https://en.wikipedia.org/wiki/Online_aggregation > > > >>>> > > > >>>> > > > >>>> > > > >>>>> On Dec 1, 2015, at 1:51 PM, Jacques Nadeau > > > >> wrote: > > > >>>>> > > > >>>>> This seems like a form of sideband communication. I think we > should > > > >>> have > > > >>>> a > > > >>>>> framework for this type of thing in general rather than a one-o= ff > > for > > > >>>> this > > > >>>>> particular need. Other forms of sideband might be small table > > > >>> bloomfilter > > > >>>>> generation and pushdown into hbase, separate file > > > >>> assignment/partitioning > > > >>>>> providers balancing/generating scanner workloads, statistics > > > >> generation > > > >>>> for > > > >>>>> adaptive execution, etc. > > > >>>>> > > > >>>>> -- > > > >>>>> Jacques Nadeau > > > >>>>> CTO and Co-Founder, Dremio > > > >>>>> > > > >>>>> On Tue, Dec 1, 2015 at 11:35 AM, Hsuan Yi Chu < > hyichu@maprtech.com > > > > > > >>>> wrote: > > > >>>>> > > > >>>>>> I am trying to deal with the following scenario: > > > >>>>>> > > > >>>>>> A bunch of minor fragments are doing things in parallel. Each = of > > > >> them > > > >>>> could > > > >>>>>> skip some records. Since the downstream minor fragment needs t= o > > know > > > >>> the > > > >>>>>> sum of skipped-record-counts (in order to just display or see = if > > the > > > >>>> number > > > >>>>>> exceeds the threshold) in the upstreams, each upstream minor > > > >> fragment > > > >>>> needs > > > >>>>>> to pass this scalar with RecordBatch. > > > >>>>>> > > > >>>>>> Since this seems impacting the protocol of RecordBatch, I am > > looking > > > >>> for > > > >>>>>> some advice here. > > > >>>>>> > > > >>>>>> Thanks. > > > >>>>>> > > > >>>> > > > >>>> > > > >>> > > > >> > > > > > > > > > --001a1148ce1cccb9870526cab44b--