drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hsuan Yi Chu <hyi...@maprtech.com>
Subject Re: Can we pass the #skipped records with RecordBatch?
Date Thu, 17 Dec 2015 19:14:09 GMT
Let me reiterate the model and ensure we are on the same page.

At planning:
After parallelization is done, we have the knowledge of minor fragments.
Then, a "planner for sideband (which needs to be implemented)" will help
decide which operator will open the sideband to talk to which operator.
Also, at planning, Drill will choose a "max-size" and "max-interval" for
sending sideband record batch.

(Take skipping record application as an example. At planning, we need to
generate the plan which instructs each operator regarding how to talk to
the "side band sink". In this particular use case, the max-size and
max-interval for sideband record batch are supposed to be small.)

At execution:
In execution, the operator which is asked to open sideband tunnel will do
so. The sideband record batch will be sent into the tunnel if either
max-size or max-interval is reached. The data transmission would follow
pull model.












On Mon, Dec 14, 2015 at 6:34 AM, Jacques Nadeau <jacques@dremio.com> wrote:

> I'm suggesting that $recordIdentifier be a new virtual field similar to
> dir0. I think that should be the case no matter what approach we take.
> On Dec 13, 2015 10:52 PM, "Hsuan Yi Chu" <hyichu@maprtech.com> wrote:
>
> > I think I missed one thing in the second solution.
> >
> > Drill needs to report the locations of the source of skipped records,
> which
> > are known only at Scan. It seems hop-by-hop is needed to carry that
> > information.
> >
> > On Sun, Dec 13, 2015 at 4:36 PM, Jacques Nadeau <jacques@dremio.com>
> > wrote:
> >
> > > If your goal is early termination, sending the messages back as quickly
> > as
> > > possible to the Screen or similar centralized operator will allow you
> to
> > > respond quickly. Remember that there will likely be many fragments
> > > executing in parallel.
> > >
> > > --
> > > Jacques Nadeau
> > > CTO and Co-Founder, Dremio
> > >
> > > On Sun, Dec 13, 2015 at 2:46 PM, Hsuan Yi Chu <hyichu@maprtech.com>
> > wrote:
> > >
> > > > On Sun, Dec 13, 2015 at 9:15 AM, Jacques Nadeau <jacques@dremio.com>
> > > > wrote:
> > > >
> > > > > You seem to be mixing multiple things in your response.
> > > > >
> > > > > - Why do you say this complex? It is very simple. Is it because you
> > > don't
> > > > > know how it would be implemented? I'm offering to do the vast
> > majority
> > > of
> > > > > the work to implement the framework so you shouldn't use that as
a
> > > gauge.
> > > > > - It is designed to provide for multiple different use cases, not
> > just
> > > > your
> > > > > own. As such, you should expect it to be more general. There is
> > > clearly a
> > > > > need to provide these messages in direction other than straight up
> > the
> > > > > operator tree. There is also a need to provide sideband messages
> > > outside
> > > > > the context of a record batch. (We shouldn't be creating fake empty
> > > > record
> > > > > batches just to send sideband messages, that caused us problems
> > before
> > > on
> > > > > the UserRpc and I think we should compound purposes.)
> > > > > - You should evaluate whether it would solve the use case you
> > > presented.
> > > > I
> > > > > believe it will.
> > > > >
> > > > > As far as your proposed implementation goes: I think you are
> > > confounding
> > > > > communication with the user with traversal of the operator tree.
I
> > > would
> > > > > assume that each operator may be able to skip records. When you
> > > > accumulate
> > > > > that information, you would want to know how much skip there were
> for
> > > > each
> > > > > operator. The info might look like:
> > > > >
> > > > > skips: [
> > > > > { op: 1:1:1, records: [123,456,789]}
> > > > > { op: 1:2:1, records: [123,456,789]}
> > > > > { op: 1:1:2, records: [123,456,789]}
> > > > > { op: 1:2:2, records: [123,456,789]}
> > > > > ]
> > > > >
> > > > > In this case, there is no need for operator 1:1:1 to know about
> > > operator
> > > > > 1:1:2's skips. It shouldn't even need to manage or move that data.
> > So I
> > > > > believe your requirements are actually to provide a stream of skip
> > > > records
> > > > > to a separate writer that should be on the edge of the plan. The
> > more I
> > > > > talk through this, I'm wondering if sideband messages should take
> the
> > > > same
> > > > > shape as a separate record batch and that we need to provide a
> > separate
> > > > > subtree/fragment for this purpose. Sideband in that case would be
a
> > tee
> > > > in
> > > > > the plan.
> > > > >
> > > >
> > > > For the case of skipping records, we will have a threshold, which
> > defines
> > > > the bound on # of skipped records before Drill fails the query. Thus,
> > if
> > > > operator 1:1:1 can be informed of how many records have been skipped
> in
> > > the
> > > > upstream operators, we could fail the query earlier.
> > > >
> > > > Given this in mind, we could have two solutions to fail the query
> > > earlier:
> > > > 1. Let the sideband message hop from upstream to downstream. On the
> > way,
> > > > each operator determines to fail the query if the threshold is
> > exceeded.
> > > >
> > > > 2. While each operator does work independently, the sideband sink
> > > operator
> > > > would be the one and the only one which has the knowledge of how
> many #
> > > > have been skipped. Once the threshold is exceeded, this sink operator
> > > will
> > > > be responsible to stop the query (via another sideband message to
> > inform
> > > > the foreman).
> > > >
> > > > When I read the proposal, I was thinking about the first solution.
> > > > Certainly, the second one seems leveraging sideband better.
> > > >
> > > > For example, imagine this tree:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/drawings/d/19w7lbpnajsmQPUqzxlb2JP2jr6MsGU9RDOmQO1N05uU/edit
> > > > >
> > > > > As you can see, I believe that the vast majority of the issues that
> > you
> > > > > want to manage with your skip record design can be managed by
> > > providing a
> > > > > couple of simple tools: sideband, sideband sink operator
> (basically a
> > > > > custom version of the union receiver), and an enhancement to the
> > Screen
> > > > > operator to support a secondary incoming stream with a defined
> schema
> > > > that
> > > > > will be transformed into a set of warnings (this also allows fine
> > > grained
> > > > > warnings or use an aggregate in the secondary tree for aggregate
> > > > warnings).
> > > > >
> > > > > The key goal here is trying to avoid the introduction of a new or
> > more
> > > > > complicated interfaces at the execution layer and instead use the
> > > logical
> > > > > layer to manage things. I believe this also extends to the concept
> of
> > > > > $recordIdentifier (or similar). This should simply be a virtual
> field
> > > > > produced by all record readers (when requested) that includes
> > relevant
> > > > > provenance information. If you want to know which records are
> > > > problematic,
> > > > > ask for the identifier and then record in a separate file.
> Basically,
> > > > let's
> > > > > use the highly efficient infrastructure we already have to do new
> > > things
> > > > > rather than implementing a new set of classes and concepts.
> > > > >
> > > > >
> > > > > --
> > > > > Jacques Nadeau
> > > > > CTO and Co-Founder, Dremio
> > > > >
> > > > > On Fri, Dec 11, 2015 at 1:16 PM, Hsuan Yi Chu <hyichu@maprtech.com
> >
> > > > wrote:
> > > > >
> > > > > > The design scope is very general, but, for the applications
we
> are
> > > > > thinking
> > > > > > about now, this is a bit complex and will make the solutions
a
> > little
> > > > bit
> > > > > > indirect. Especially, this one "data to be sent between any
two
> > > > > > three-coordinate locations" implies sideband data goes in
> teleport?
> > > > This
> > > > > is
> > > > > > a bit too involving. And even for advanced pushdown, it is not
> > > > necessary
> > > > > to
> > > > > > be that flexible for communications.
> > > > > >
> > > > > > My original picture of "sideband" is that the additional
> > information
> > > > > should
> > > > > > be "associated with" RecordBatch. That means this additional
> > > > information
> > > > > > should be attached to a particular RecordBatch and cannot run
on
> > > their
> > > > > own.
> > > > > >
> > > > > > As the RecordBatch flows from upstream to downstream, the
> operator
> > > can
> > > > > > optionally access or update the sideband message.
> > > > > > For example, in the application of record-skipping, operator
can
> > see
> > > > how
> > > > > > many records were skipped so far and increment the count if
more
> > are
> > > > > > skipped.
> > > > > >
> > > > > > If we go with this design, the place we need to change is on
the
> > > > receiver
> > > > > > side, which needs to decode the sideband info from the incoming
> > > > buffers.
> > > > > >
> > > > > > On Tue, Dec 8, 2015 at 7:10 PM, Jacques Nadeau <
> jacques@dremio.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > inline
> > > > > > >
> > > > > > > It seems that SidebandTunnel is point-to-point. That is,
there
> is
> > > one
> > > > > > > > producer and one consumer. No broadcast or topics
(multiple
> > > > consumers
> > > > > > of
> > > > > > > > the same message). Order is preserved. At-most-once
(i.e. may
> > > lose
> > > > > data
> > > > > > > in
> > > > > > > > event of failure). Producer and consumer may be on
the same
> > node
> > > or
> > > > > > > > different nodes. Correct?
> > > > > > > >
> > > > > > >
> > > > > > > Yes, you are correct in all of this. Since we don't use
UDP in
> > > Drill,
> > > > > we
> > > > > > do
> > > > > > > broadcast as a collection of individual p2p calls, all
using
> the
> > > same
> > > > > > > message (and multiple reference counts if using raw bytes).
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > I’m not sure SidebandTunnel.close is necessary.
I would
> presume
> > > > that
> > > > > a
> > > > > > > > SidebandTunnel is closed when its associated statement
is
> > closed,
> > > > and
> > > > > > > only
> > > > > > > > then.
> > > > > > > >
> > > > > > >
> > > > > > > I started without it. My thought was that we may need to
signal
> > > that
> > > > > > you've
> > > > > > > gotten all of a sideband stream prior to the close of a
> > particular
> > > > > > > fragment. If I'm on the downside of an operation reporting
> > multiple
> > > > > > skips,
> > > > > > > I may want to hold off on reporting to the user until I
got all
> > of
> > > > the
> > > > > > > messages. One option is for the sender to send a discrete
> message
> > > via
> > > > > the
> > > > > > > Tunnel close. The other option is a implicit message when
the
> > > > fragment
> > > > > is
> > > > > > > completed. I like the latter from a cleanliness perspective
but
> > > think
> > > > > the
> > > > > > > former may be required. I'm ok for not exposing at the
tunnel
> > level
> > > > > > > publically initially and we can always expose later. I
would
> love
> > > to
> > > > > hear
> > > > > > > whether people think there is going to be a need/use case
to
> > > continue
> > > > > > > fragment operation but have another operator know that
a
> sideband
> > > > > stream
> > > > > > is
> > > > > > > complete. Maybe when sending a downstream set of samples
on the
> > > first
> > > > > 1mm
> > > > > > > records of a larger scan?
> > > > > > >
> > > > > > >
> > > > > > > > Also, would it be easier if the tunnels were defined
as part
> of
> > > the
> > > > > > DAG,
> > > > > > > > and DAG initialization time was the only time that
they could
> > be
> > > > > > created?
> > > > > > > >
> > > > > > >
> > > > > > > That is a really good question. I need to think about it
a bit.
> > I'm
> > > > not
> > > > > > > sure it is easier given my initial proposal is to piggy-back
on
> > the
> > > > > > > DataTunnel, (which is independent of DAG initialization).
> > However,
> > > > it
> > > > > > > might be cleaner if operators have to declare this relationship
> > at
> > > > > > > initialization time and it is all managed 'outside'.
> > > > > > >
> > > > > > > Thanks for the feedback. Will need to think further on
your
> last
> > > > point
> > > > > > > especially.
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > Julian
> > > > > > > >
> > > > > > > >
> > > > > > > > > On Dec 8, 2015, at 11:00 AM, Jacques Nadeau <
> > > jacques@dremio.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Please see some initial thoughts attached. Would
love
> > feedback
> > > > and
> > > > > > > > thoughts
> > > > > > > > > from others on how we can shape this.
> > > > > > > > >
> > > > > > > > > https://gist.github.com/jacques-n/84b13e704e0e3829ca99
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Jacques Nadeau
> > > > > > > > > CTO and Co-Founder, Dremio
> > > > > > > > >
> > > > > > > > > On Thu, Dec 3, 2015 at 8:17 AM, Zelaine Fong
<
> > > zfong@maprtech.com
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Yes, it would be great to get your thoughts
so we can
> assess
> > > the
> > > > > > scope
> > > > > > > > of
> > > > > > > > >> what's involved.
> > > > > > > > >>
> > > > > > > > >> Thanks.
> > > > > > > > >>
> > > > > > > > >> -- Zelaine
> > > > > > > > >>
> > > > > > > > >> On Wed, Dec 2, 2015 at 7:29 PM, Jacques Nadeau
<
> > > > > jacques@dremio.com>
> > > > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >>> Definitely agree that we shouldn't boil
the ocean.  That
> > > said,
> > > > I
> > > > > > > don't
> > > > > > > > >>> think we should make RecordBatch interface
changes
> without
> > > > > > deliberate
> > > > > > > > >>> design. Same for RPC protocol changes.
Part of my
> internal
> > > > > struggle
> > > > > > > > with
> > > > > > > > >>> the warning patch is exactly this lack
of broader
> design. I
> > > > think
> > > > > > > this
> > > > > > > > is
> > > > > > > > >>> especially true given the drive to supports
backwards
> > > > > > compatibility.
> > > > > > > > >>>
> > > > > > > > >>> I don't think we're talking about a massive
undertaking.
> > I'll
> > > > try
> > > > > > to
> > > > > > > > >> write
> > > > > > > > >>> up some thoughts later this week to get
the ball rolling.
> > > Sound
> > > > > > good?
> > > > > > > > >>>
> > > > > > > > >>> --
> > > > > > > > >>> Jacques Nadeau
> > > > > > > > >>> CTO and Co-Founder, Dremio
> > > > > > > > >>> +1 on having a framework.
> > > > > > > > >>> OTOH, as with the warnings implementation,
we might want
> to
> > > go
> > > > > > ahead
> > > > > > > > >> with a
> > > > > > > > >>> simpler implementation while we get a
more generic
> > framework
> > > > > design
> > > > > > > in
> > > > > > > > >>> place.
> > > > > > > > >>>
> > > > > > > > >>> Jacques, do you have any preliminary
thoughts on the
> > > framework?
> > > > > > > > >>>
> > > > > > > > >>> On Tue, Dec 1, 2015 at 2:08 PM, Julian
Hyde <
> > > jhyde@apache.org>
> > > > > > > wrote:
> > > > > > > > >>>
> > > > > > > > >>>> +1 for a sideband mechanism.
> > > > > > > > >>>>
> > > > > > > > >>>> Sideband can also allow correlated
restart of
> sub-queries.
> > > > > > > > >>>>
> > > > > > > > >>>> In sideband use cases you described,
the messages ran in
> > the
> > > > > > > opposite
> > > > > > > > >>>> direction to the data. Would the
sideband also run in
> the
> > > same
> > > > > > > > >> direction
> > > > > > > > >>> as
> > > > > > > > >>>> the data? If so it could carry warnings,
rejected rows,
> > > > progress
> > > > > > > > >>>> indications, and (for online aggregation[1])
> notifications
> > > > that
> > > > > a
> > > > > > > > >> better
> > > > > > > > >>>> approximate query result is available.
> > > > > > > > >>>>
> > > > > > > > >>>> Julian
> > > > > > > > >>>>
> > > > > > > > >>>> [1] https://en.wikipedia.org/wiki/Online_aggregation
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>>> On Dec 1, 2015, at 1:51 PM, Jacques
Nadeau <
> > > > jacques@dremio.com
> > > > > >
> > > > > > > > >> wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>> This seems like a form of sideband
communication. I
> think
> > > we
> > > > > > should
> > > > > > > > >>> have
> > > > > > > > >>>> a
> > > > > > > > >>>>> framework for this type of thing
in general rather
> than a
> > > > > one-off
> > > > > > > for
> > > > > > > > >>>> this
> > > > > > > > >>>>> particular need. Other forms
of sideband might be small
> > > table
> > > > > > > > >>> bloomfilter
> > > > > > > > >>>>> generation and pushdown into
hbase, separate file
> > > > > > > > >>> assignment/partitioning
> > > > > > > > >>>>> providers balancing/generating
scanner workloads,
> > > statistics
> > > > > > > > >> generation
> > > > > > > > >>>> for
> > > > > > > > >>>>> adaptive execution, etc.
> > > > > > > > >>>>>
> > > > > > > > >>>>> --
> > > > > > > > >>>>> Jacques Nadeau
> > > > > > > > >>>>> CTO and Co-Founder, Dremio
> > > > > > > > >>>>>
> > > > > > > > >>>>> On Tue, Dec 1, 2015 at 11:35
AM, Hsuan Yi Chu <
> > > > > > hyichu@maprtech.com
> > > > > > > >
> > > > > > > > >>>> wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>>> I am trying to deal with
the following scenario:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> A bunch of minor fragments
are doing things in
> parallel.
> > > > Each
> > > > > of
> > > > > > > > >> them
> > > > > > > > >>>> could
> > > > > > > > >>>>>> skip some records. Since
the downstream minor fragment
> > > needs
> > > > > to
> > > > > > > know
> > > > > > > > >>> the
> > > > > > > > >>>>>> sum of skipped-record-counts
(in order to just display
> > or
> > > > see
> > > > > if
> > > > > > > the
> > > > > > > > >>>> number
> > > > > > > > >>>>>> exceeds the threshold) in
the upstreams, each upstream
> > > minor
> > > > > > > > >> fragment
> > > > > > > > >>>> needs
> > > > > > > > >>>>>> to pass this scalar with
RecordBatch.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Since this seems impacting
the protocol of
> RecordBatch,
> > I
> > > am
> > > > > > > looking
> > > > > > > > >>> for
> > > > > > > > >>>>>> some advice here.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Thanks.
> > > > > > > > >>>>>>
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>
> > > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message