drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacques Nadeau <jacq...@dremio.com>
Subject Re: Can we pass the #skipped records with RecordBatch?
Date Mon, 14 Dec 2015 14:34:38 GMT
I'm suggesting that $recordIdentifier be a new virtual field similar to
dir0. I think that should be the case no matter what approach we take.
On Dec 13, 2015 10:52 PM, "Hsuan Yi Chu" <hyichu@maprtech.com> wrote:

> I think I missed one thing in the second solution.
>
> Drill needs to report the locations of the source of skipped records, which
> are known only at Scan. It seems hop-by-hop is needed to carry that
> information.
>
> On Sun, Dec 13, 2015 at 4:36 PM, Jacques Nadeau <jacques@dremio.com>
> wrote:
>
> > If your goal is early termination, sending the messages back as quickly
> as
> > possible to the Screen or similar centralized operator will allow you to
> > respond quickly. Remember that there will likely be many fragments
> > executing in parallel.
> >
> > --
> > Jacques Nadeau
> > CTO and Co-Founder, Dremio
> >
> > On Sun, Dec 13, 2015 at 2:46 PM, Hsuan Yi Chu <hyichu@maprtech.com>
> wrote:
> >
> > > On Sun, Dec 13, 2015 at 9:15 AM, Jacques Nadeau <jacques@dremio.com>
> > > wrote:
> > >
> > > > You seem to be mixing multiple things in your response.
> > > >
> > > > - Why do you say this complex? It is very simple. Is it because you
> > don't
> > > > know how it would be implemented? I'm offering to do the vast
> majority
> > of
> > > > the work to implement the framework so you shouldn't use that as a
> > gauge.
> > > > - It is designed to provide for multiple different use cases, not
> just
> > > your
> > > > own. As such, you should expect it to be more general. There is
> > clearly a
> > > > need to provide these messages in direction other than straight up
> the
> > > > operator tree. There is also a need to provide sideband messages
> > outside
> > > > the context of a record batch. (We shouldn't be creating fake empty
> > > record
> > > > batches just to send sideband messages, that caused us problems
> before
> > on
> > > > the UserRpc and I think we should compound purposes.)
> > > > - You should evaluate whether it would solve the use case you
> > presented.
> > > I
> > > > believe it will.
> > > >
> > > > As far as your proposed implementation goes: I think you are
> > confounding
> > > > communication with the user with traversal of the operator tree. I
> > would
> > > > assume that each operator may be able to skip records. When you
> > > accumulate
> > > > that information, you would want to know how much skip there were for
> > > each
> > > > operator. The info might look like:
> > > >
> > > > skips: [
> > > > { op: 1:1:1, records: [123,456,789]}
> > > > { op: 1:2:1, records: [123,456,789]}
> > > > { op: 1:1:2, records: [123,456,789]}
> > > > { op: 1:2:2, records: [123,456,789]}
> > > > ]
> > > >
> > > > In this case, there is no need for operator 1:1:1 to know about
> > operator
> > > > 1:1:2's skips. It shouldn't even need to manage or move that data.
> So I
> > > > believe your requirements are actually to provide a stream of skip
> > > records
> > > > to a separate writer that should be on the edge of the plan. The
> more I
> > > > talk through this, I'm wondering if sideband messages should take the
> > > same
> > > > shape as a separate record batch and that we need to provide a
> separate
> > > > subtree/fragment for this purpose. Sideband in that case would be a
> tee
> > > in
> > > > the plan.
> > > >
> > >
> > > For the case of skipping records, we will have a threshold, which
> defines
> > > the bound on # of skipped records before Drill fails the query. Thus,
> if
> > > operator 1:1:1 can be informed of how many records have been skipped in
> > the
> > > upstream operators, we could fail the query earlier.
> > >
> > > Given this in mind, we could have two solutions to fail the query
> > earlier:
> > > 1. Let the sideband message hop from upstream to downstream. On the
> way,
> > > each operator determines to fail the query if the threshold is
> exceeded.
> > >
> > > 2. While each operator does work independently, the sideband sink
> > operator
> > > would be the one and the only one which has the knowledge of how many #
> > > have been skipped. Once the threshold is exceeded, this sink operator
> > will
> > > be responsible to stop the query (via another sideband message to
> inform
> > > the foreman).
> > >
> > > When I read the proposal, I was thinking about the first solution.
> > > Certainly, the second one seems leveraging sideband better.
> > >
> > > For example, imagine this tree:
> > > >
> > > >
> > > >
> > >
> >
> https://docs.google.com/drawings/d/19w7lbpnajsmQPUqzxlb2JP2jr6MsGU9RDOmQO1N05uU/edit
> > > >
> > > > As you can see, I believe that the vast majority of the issues that
> you
> > > > want to manage with your skip record design can be managed by
> > providing a
> > > > couple of simple tools: sideband, sideband sink operator (basically a
> > > > custom version of the union receiver), and an enhancement to the
> Screen
> > > > operator to support a secondary incoming stream with a defined schema
> > > that
> > > > will be transformed into a set of warnings (this also allows fine
> > grained
> > > > warnings or use an aggregate in the secondary tree for aggregate
> > > warnings).
> > > >
> > > > The key goal here is trying to avoid the introduction of a new or
> more
> > > > complicated interfaces at the execution layer and instead use the
> > logical
> > > > layer to manage things. I believe this also extends to the concept of
> > > > $recordIdentifier (or similar). This should simply be a virtual field
> > > > produced by all record readers (when requested) that includes
> relevant
> > > > provenance information. If you want to know which records are
> > > problematic,
> > > > ask for the identifier and then record in a separate file. Basically,
> > > let's
> > > > use the highly efficient infrastructure we already have to do new
> > things
> > > > rather than implementing a new set of classes and concepts.
> > > >
> > > >
> > > > --
> > > > Jacques Nadeau
> > > > CTO and Co-Founder, Dremio
> > > >
> > > > On Fri, Dec 11, 2015 at 1:16 PM, Hsuan Yi Chu <hyichu@maprtech.com>
> > > wrote:
> > > >
> > > > > The design scope is very general, but, for the applications we are
> > > > thinking
> > > > > about now, this is a bit complex and will make the solutions a
> little
> > > bit
> > > > > indirect. Especially, this one "data to be sent between any two
> > > > > three-coordinate locations" implies sideband data goes in teleport?
> > > This
> > > > is
> > > > > a bit too involving. And even for advanced pushdown, it is not
> > > necessary
> > > > to
> > > > > be that flexible for communications.
> > > > >
> > > > > My original picture of "sideband" is that the additional
> information
> > > > should
> > > > > be "associated with" RecordBatch. That means this additional
> > > information
> > > > > should be attached to a particular RecordBatch and cannot run on
> > their
> > > > own.
> > > > >
> > > > > As the RecordBatch flows from upstream to downstream, the operator
> > can
> > > > > optionally access or update the sideband message.
> > > > > For example, in the application of record-skipping, operator can
> see
> > > how
> > > > > many records were skipped so far and increment the count if more
> are
> > > > > skipped.
> > > > >
> > > > > If we go with this design, the place we need to change is on the
> > > receiver
> > > > > side, which needs to decode the sideband info from the incoming
> > > buffers.
> > > > >
> > > > > On Tue, Dec 8, 2015 at 7:10 PM, Jacques Nadeau <jacques@dremio.com
> >
> > > > wrote:
> > > > >
> > > > > > inline
> > > > > >
> > > > > > It seems that SidebandTunnel is point-to-point. That is, there
is
> > one
> > > > > > > producer and one consumer. No broadcast or topics (multiple
> > > consumers
> > > > > of
> > > > > > > the same message). Order is preserved. At-most-once (i.e.
may
> > lose
> > > > data
> > > > > > in
> > > > > > > event of failure). Producer and consumer may be on the
same
> node
> > or
> > > > > > > different nodes. Correct?
> > > > > > >
> > > > > >
> > > > > > Yes, you are correct in all of this. Since we don't use UDP
in
> > Drill,
> > > > we
> > > > > do
> > > > > > broadcast as a collection of individual p2p calls, all using
the
> > same
> > > > > > message (and multiple reference counts if using raw bytes).
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > I’m not sure SidebandTunnel.close is necessary. I would
presume
> > > that
> > > > a
> > > > > > > SidebandTunnel is closed when its associated statement
is
> closed,
> > > and
> > > > > > only
> > > > > > > then.
> > > > > > >
> > > > > >
> > > > > > I started without it. My thought was that we may need to signal
> > that
> > > > > you've
> > > > > > gotten all of a sideband stream prior to the close of a
> particular
> > > > > > fragment. If I'm on the downside of an operation reporting
> multiple
> > > > > skips,
> > > > > > I may want to hold off on reporting to the user until I got
all
> of
> > > the
> > > > > > messages. One option is for the sender to send a discrete message
> > via
> > > > the
> > > > > > Tunnel close. The other option is a implicit message when the
> > > fragment
> > > > is
> > > > > > completed. I like the latter from a cleanliness perspective
but
> > think
> > > > the
> > > > > > former may be required. I'm ok for not exposing at the tunnel
> level
> > > > > > publically initially and we can always expose later. I would
love
> > to
> > > > hear
> > > > > > whether people think there is going to be a need/use case to
> > continue
> > > > > > fragment operation but have another operator know that a sideband
> > > > stream
> > > > > is
> > > > > > complete. Maybe when sending a downstream set of samples on
the
> > first
> > > > 1mm
> > > > > > records of a larger scan?
> > > > > >
> > > > > >
> > > > > > > Also, would it be easier if the tunnels were defined as
part of
> > the
> > > > > DAG,
> > > > > > > and DAG initialization time was the only time that they
could
> be
> > > > > created?
> > > > > > >
> > > > > >
> > > > > > That is a really good question. I need to think about it a bit.
> I'm
> > > not
> > > > > > sure it is easier given my initial proposal is to piggy-back
on
> the
> > > > > > DataTunnel, (which is independent of DAG initialization).
> However,
> > > it
> > > > > > might be cleaner if operators have to declare this relationship
> at
> > > > > > initialization time and it is all managed 'outside'.
> > > > > >
> > > > > > Thanks for the feedback. Will need to think further on your
last
> > > point
> > > > > > especially.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > Julian
> > > > > > >
> > > > > > >
> > > > > > > > On Dec 8, 2015, at 11:00 AM, Jacques Nadeau <
> > jacques@dremio.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > Please see some initial thoughts attached. Would love
> feedback
> > > and
> > > > > > > thoughts
> > > > > > > > from others on how we can shape this.
> > > > > > > >
> > > > > > > > https://gist.github.com/jacques-n/84b13e704e0e3829ca99
> > > > > > > >
> > > > > > > > --
> > > > > > > > Jacques Nadeau
> > > > > > > > CTO and Co-Founder, Dremio
> > > > > > > >
> > > > > > > > On Thu, Dec 3, 2015 at 8:17 AM, Zelaine Fong <
> > zfong@maprtech.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > >> Yes, it would be great to get your thoughts so
we can assess
> > the
> > > > > scope
> > > > > > > of
> > > > > > > >> what's involved.
> > > > > > > >>
> > > > > > > >> Thanks.
> > > > > > > >>
> > > > > > > >> -- Zelaine
> > > > > > > >>
> > > > > > > >> On Wed, Dec 2, 2015 at 7:29 PM, Jacques Nadeau
<
> > > > jacques@dremio.com>
> > > > > > > wrote:
> > > > > > > >>
> > > > > > > >>> Definitely agree that we shouldn't boil the
ocean.  That
> > said,
> > > I
> > > > > > don't
> > > > > > > >>> think we should make RecordBatch interface
changes without
> > > > > deliberate
> > > > > > > >>> design. Same for RPC protocol changes. Part
of my internal
> > > > struggle
> > > > > > > with
> > > > > > > >>> the warning patch is exactly this lack of
broader design. I
> > > think
> > > > > > this
> > > > > > > is
> > > > > > > >>> especially true given the drive to supports
backwards
> > > > > compatibility.
> > > > > > > >>>
> > > > > > > >>> I don't think we're talking about a massive
undertaking.
> I'll
> > > try
> > > > > to
> > > > > > > >> write
> > > > > > > >>> up some thoughts later this week to get the
ball rolling.
> > Sound
> > > > > good?
> > > > > > > >>>
> > > > > > > >>> --
> > > > > > > >>> Jacques Nadeau
> > > > > > > >>> CTO and Co-Founder, Dremio
> > > > > > > >>> +1 on having a framework.
> > > > > > > >>> OTOH, as with the warnings implementation,
we might want to
> > go
> > > > > ahead
> > > > > > > >> with a
> > > > > > > >>> simpler implementation while we get a more
generic
> framework
> > > > design
> > > > > > in
> > > > > > > >>> place.
> > > > > > > >>>
> > > > > > > >>> Jacques, do you have any preliminary thoughts
on the
> > framework?
> > > > > > > >>>
> > > > > > > >>> On Tue, Dec 1, 2015 at 2:08 PM, Julian Hyde
<
> > jhyde@apache.org>
> > > > > > wrote:
> > > > > > > >>>
> > > > > > > >>>> +1 for a sideband mechanism.
> > > > > > > >>>>
> > > > > > > >>>> Sideband can also allow correlated restart
of sub-queries.
> > > > > > > >>>>
> > > > > > > >>>> In sideband use cases you described, the
messages ran in
> the
> > > > > > opposite
> > > > > > > >>>> direction to the data. Would the sideband
also run in the
> > same
> > > > > > > >> direction
> > > > > > > >>> as
> > > > > > > >>>> the data? If so it could carry warnings,
rejected rows,
> > > progress
> > > > > > > >>>> indications, and (for online aggregation[1])
notifications
> > > that
> > > > a
> > > > > > > >> better
> > > > > > > >>>> approximate query result is available.
> > > > > > > >>>>
> > > > > > > >>>> Julian
> > > > > > > >>>>
> > > > > > > >>>> [1] https://en.wikipedia.org/wiki/Online_aggregation
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>> On Dec 1, 2015, at 1:51 PM, Jacques
Nadeau <
> > > jacques@dremio.com
> > > > >
> > > > > > > >> wrote:
> > > > > > > >>>>>
> > > > > > > >>>>> This seems like a form of sideband
communication. I think
> > we
> > > > > should
> > > > > > > >>> have
> > > > > > > >>>> a
> > > > > > > >>>>> framework for this type of thing in
general rather than a
> > > > one-off
> > > > > > for
> > > > > > > >>>> this
> > > > > > > >>>>> particular need. Other forms of sideband
might be small
> > table
> > > > > > > >>> bloomfilter
> > > > > > > >>>>> generation and pushdown into hbase,
separate file
> > > > > > > >>> assignment/partitioning
> > > > > > > >>>>> providers balancing/generating scanner
workloads,
> > statistics
> > > > > > > >> generation
> > > > > > > >>>> for
> > > > > > > >>>>> adaptive execution, etc.
> > > > > > > >>>>>
> > > > > > > >>>>> --
> > > > > > > >>>>> Jacques Nadeau
> > > > > > > >>>>> CTO and Co-Founder, Dremio
> > > > > > > >>>>>
> > > > > > > >>>>> On Tue, Dec 1, 2015 at 11:35 AM, Hsuan
Yi Chu <
> > > > > hyichu@maprtech.com
> > > > > > >
> > > > > > > >>>> wrote:
> > > > > > > >>>>>
> > > > > > > >>>>>> I am trying to deal with the following
scenario:
> > > > > > > >>>>>>
> > > > > > > >>>>>> A bunch of minor fragments are
doing things in parallel.
> > > Each
> > > > of
> > > > > > > >> them
> > > > > > > >>>> could
> > > > > > > >>>>>> skip some records. Since the downstream
minor fragment
> > needs
> > > > to
> > > > > > know
> > > > > > > >>> the
> > > > > > > >>>>>> sum of skipped-record-counts (in
order to just display
> or
> > > see
> > > > if
> > > > > > the
> > > > > > > >>>> number
> > > > > > > >>>>>> exceeds the threshold) in the
upstreams, each upstream
> > minor
> > > > > > > >> fragment
> > > > > > > >>>> needs
> > > > > > > >>>>>> to pass this scalar with RecordBatch.
> > > > > > > >>>>>>
> > > > > > > >>>>>> Since this seems impacting the
protocol of RecordBatch,
> I
> > am
> > > > > > looking
> > > > > > > >>> for
> > > > > > > >>>>>> some advice here.
> > > > > > > >>>>>>
> > > > > > > >>>>>> Thanks.
> > > > > > > >>>>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>
> > > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message