apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Farkas <...@datatorrent.com>
Subject Re: load based stream partitioning
Date Fri, 12 Feb 2016 03:14:35 GMT
Hey Pramod,

I thought APEX-339 would cause everyone to get restarted to a common
checkpoint if P1 failed. But I think I misunderstood.
Based on what you just said, how do you guarantee that P1 will receive the
same data as before? Will Stream Codecs only apply to windows higher than a
certain id?

Building off of Gaurav's example. Let's say we have the following situation:

1. we are using Stream Codec A,
2. then on Window 30 we start using Stream Code B.
3. Window 40 P1 fails and comes back to window 20.
4. P2 never failed and continues running this whole time.

Will Stream Codec A still apply to the data P1 gets for window 20 - 30 and
Stream Codec B will be used for windows 30 and above? If it is not, some
data could be
lost or duplicated.

Thanks,
Tim

On Thu, Feb 11, 2016 at 6:58 PM, Pramod Immaneni <pramod@datatorrent.com>
wrote:

> So P1 and P2 have an upstream operator sending data, when P1 restarts it
> will receive same data as before already present in buffer server. If the
> upstream operator has to restart all downstream will be reset to same
> checkpoint.
>
> On Thu, Feb 11, 2016 at 6:03 PM, Gaurav Gupta <gaurav.gopi123@gmail.com>
> wrote:
>
> > How will it work in following scenario
> >
> > Say Operator O has two partitions P1 and P2. P1 was processing faster
> than
> > P2 and streamcodec decided to send more data to P1. P1 process some
> windows
> > and then P1 crashes and it came back to previous checkpoint. Now P1 comes
> > on a node which is slow and it processes slowly. So the streamcodec
> decides
> > to send less data to P1. In this case will application not loose data
> some
> > windows?
> >
> > Similarly in a reverse scenario, I think there will be duplicate of data.
> >
> >
> >
> > On Thu, Feb 11, 2016 at 5:46 PM, Pramod Immaneni <pramod@datatorrent.com
> >
> > wrote:
> >
> > > Inline
> > >
> > > On Thu, Feb 11, 2016 at 4:32 PM, Timothy Farkas <tim@datatorrent.com>
> > > wrote:
> > >
> > > > Comments inline
> > > >
> > > > +1 Overall as well provided Apex-339 is implemented first and it is
> > > > documented that the mechanism should not be used with some stateful
> > > > operators.
> > > >
> > > > On Thu, Feb 11, 2016 at 4:20 PM, Pramod Immaneni <
> > pramod@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > Comments inline
> > > > >
> > > > > On Thu, Feb 11, 2016 at 4:13 PM, Timothy Farkas <
> tim@datatorrent.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Hey Pramod,
> > > > > >
> > > > > > I agree if APEX-339 is in place then it would work without
> > > redeploying
> > > > > > containers for operators that are Stateless, or a subset of
> > Stateful
> > > > > > operators.
> > > > > >
> > > > > > Addressing your previous questions.
> > > > > >
> > > > > > - The StatsListener can be used to see how far behind operators
> > are.
> > > > You
> > > > > > could determine what window the operator is on, or the number
of
> > > tuples
> > > > > > it's processed so far, or how long
> > > > > > it takes it to complete a window.
> > > > > >
> > > > >
> > > > > What if tuples are different sizes and number of tuples processed
> > > doesn't
> > > > > reflect how far ahead or behind a downstream partition is? How is
> the
> > > > > information from StatsListener made available to the upstream
> > partition
> > > > > codecs.
> > > > >
> > > > What is the information Buffer Server can provide that the
> > StatsListener
> > > > cannot?
> > > >
> > >
> > > The stats information would have to be relayed down to the upstream
> > > operators. It's possible.
> > >
> > >
> > > >
> > > > The StatsListener can trigger a repartition. The information in the
> > > > StatsListener can be shared
> > > > with the partitioner by setting the same object for both in populate
> > Dag.
> > > > The partitioner can then
> > > > compute the new Stream Codec. The mechanism by which the upstream
> would
> > > be
> > > > updated with the new
> > > > Stream Codec would have to be implemented as it's currently not
> there.
> > > >
> > > > >
> > > > >
> > > > > >
> > > > > > - Some examples of Stateful operators that require repartitioning
> > of
> > > > > state
> > > > > > are the following:
> > > > > >       - Deduper
> > > > > >            In this case after updating the stream codec the
> > operator
> > > > may
> > > > > > allow a previously seen value to pass because the partition
> didn't
> > > > > receive
> > > > > > that value with the previous stream codec.
> > > > > >       - A key value store that holds aggregations for each key.
> > > > > >            In this case multiple partitions would hold partial
> > > > > aggregations
> > > > > > for a key, when they are expecting to hold the complete
> > aggregation.
> > > > > >
> > > > >
> > > > > Agreed for deduper. For the second case a unifier is a better
> > approach
> > > so
> > > > > that you are not affected by key skew in general.
> > > > >
> > > > This is not always possible. We can discuss this offline, since it
> > won't
> > > > add much to the discussion here to go into the details.
> > > >
> > > >
> > > Yes not always.
> > >
> > >
> > > > >
> > > > >
> > > > > >
> > > > > > Tim
> > > > > >
> > > > > > On Thu, Feb 11, 2016 at 4:04 PM, Pramod Immaneni <
> > > > pramod@datatorrent.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Additionally it can be treated as a non-idempotent stream
for
> > > > recovery.
> > > > > > > Look at APEXCORE-339. In cases where the downstream partitions
> > > > require
> > > > > > some
> > > > > > > key based partitioning, what you are suggesting would be
a good
> > > > > approach
> > > > > > > but it will require more complex logic in the StreamCodec
to
> both
> > > key
> > > > > and
> > > > > > > load based partitioning.
> > > > > > >
> > > > > > > On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni <
> > > > > pramod@datatorrent.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > How would you know how far behind partitions are without
> > > > interacting
> > > > > > with
> > > > > > > > BufferServer like you were mentioning in the earlier
email.
> > > > Secondly
> > > > > > why
> > > > > > > > would changing where the data is sent to based mandate
> > > > > re-partitioning
> > > > > > if
> > > > > > > > the downstream partitions can handle data with different
> keys.
> > > > > > > >
> > > > > > > > On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas <
> > > > tim@datatorrent.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hey Pramod,
> > > > > > > >>
> > > > > > > >> I think in general and for recovery the existing
> Partitioning
> > > > > > machinery
> > > > > > > >> can
> > > > > > > >> be reused to update the Stream Codec.
> > > > > > > >> The reason why is because If the operator is Stateful
and
> > > changes
> > > > > are
> > > > > > > made
> > > > > > > >> to the Stream Codec, the state of the partitions
will also
> > have
> > > to
> > > > > be
> > > > > > > >> repartitioned.
> > > > > > > >> In this case the number of partitions will remain
the same,
> > just
> > > > the
> > > > > > > state
> > > > > > > >> of the partitions is reshuffled. The implementation
for this
> > > state
> > > > > > > >> reshuffling in a fault tolerant way is already
handled by
> the
> > > > > Dynamic
> > > > > > > >> Partitioning logic, so it could be extended to
update the
> > Stream
> > > > > Codec
> > > > > > > as
> > > > > > > >> well.
> > > > > > > >>
> > > > > > > >> If the operator is Stateless, it may be possible
to do
> without
> > > > > > > redeploying
> > > > > > > >> any containers. But with the way I am envisioning
it, I
> think
> > > > there
> > > > > > > would
> > > > > > > >> be a lot of difficult to handle corner cases for
recovery.
> > > > > > > >>
> > > > > > > >> Tim
> > > > > > > >>
> > > > > > > >> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni
<
> > > > > > > pramod@datatorrent.com>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> > Comment inline.
> > > > > > > >> >
> > > > > > > >> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy
Farkas <
> > > > > > tim@datatorrent.com
> > > > > > > >
> > > > > > > >> > wrote:
> > > > > > > >> >
> > > > > > > >> > > +1 for the idea.
> > > > > > > >> > >
> > > > > > > >> > > Gaurav, this could be done idempotently
in the same way
> > that
> > > > > > dynamic
> > > > > > > >> > > repartitioning is done idempotently.
All the partitions
> > are
> > > > > rolled
> > > > > > > >> back
> > > > > > > >> > to
> > > > > > > >> > > a common checkpoint and the new StreamCodec
is applied
> > > > starting
> > > > > > > then.
> > > > > > > >> The
> > > > > > > >> > > statistics that the Stream Codec are
given are the
> > > statistics
> > > > > for
> > > > > > > the
> > > > > > > >> > > windows computed before the common checkpoint
that the
> > > > > partitions
> > > > > > > are
> > > > > > > >> > > rolled back to.
> > > > > > > >> > >
> > > > > > > >> > > In fact I think this feature could be
added easily by
> > > avoiding
> > > > > > > buffer
> > > > > > > >> > > server entirely and by allowing the
Partitioner to
> > redefine
> > > > the
> > > > > > > >> > StreamCodec
> > > > > > > >> > > for the operator when define partitions
is called.
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >> > Are you saying this in context of recovery
or in general?
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > >
> > > > > > > >> > > Thanks,
> > > > > > > >> > > Tim
> > > > > > > >> > >
> > > > > > > >> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol
Kekre <
> > > > > > amol@datatorrent.com>
> > > > > > > >> > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Gaurav,
> > > > > > > >> > > > It would not be idempotent per
partition, but will be
> > > across
> > > > > all
> > > > > > > >> > > partitions
> > > > > > > >> > > > combined. In this case the user
would have explicitly
> > > asked
> > > > > for
> > > > > > > >> such a
> > > > > > > >> > > > pattern.
> > > > > > > >> > > >
> > > > > > > >> > > > Thks,
> > > > > > > >> > > > Amol
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > On Thu, Feb 11, 2016 at 12:04 PM,
Gaurav Gupta <
> > > > > > > >> > gaurav.gopi123@gmail.com
> > > > > > > >> > > >
> > > > > > > >> > > > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > > > Pramod,
> > > > > > > >> > > > >
> > > > > > > >> > > > > How would it work with recovery?
There could be
> cases
> > > > where
> > > > > a
> > > > > > > >> tuple
> > > > > > > >> > > went
> > > > > > > >> > > > to
> > > > > > > >> > > > > P1 and post recovery it can
go to P2
> > > > > > > >> > > > >
> > > > > > > >> > > > > Gaurav
> > > > > > > >> > > > >
> > > > > > > >> > > > > On Thu, Feb 11, 2016 at 11:56
AM, Pramod Immaneni <
> > > > > > > >> > > > pramod@datatorrent.com>
> > > > > > > >> > > > > wrote:
> > > > > > > >> > > > >
> > > > > > > >> > > > > > Hi,
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > There are scenarios where
the downstream
> partitions
> > of
> > > > an
> > > > > > > >> upstream
> > > > > > > >> > > > > operator
> > > > > > > >> > > > > > are generally not performing
uniformly resulting
> in
> > an
> > > > > > overall
> > > > > > > >> > > > > sub-optimal
> > > > > > > >> > > > > > performance dictated
by the slowest partitions.
> The
> > > > > reasons
> > > > > > > >> could
> > > > > > > >> > be
> > > > > > > >> > > > data
> > > > > > > >> > > > > > related such as some
partitions are receiving more
> > > data
> > > > to
> > > > > > > >> process
> > > > > > > >> > > than
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > others or could be environment
related such as
> some
> > > > > > partitions
> > > > > > > >> are
> > > > > > > >> > > > > running
> > > > > > > >> > > > > > slower than others because
they are on heavily
> > loaded
> > > > > nodes.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > A solution based on currently
available
> > functionality
> > > in
> > > > > the
> > > > > > > >> engine
> > > > > > > >> > > > would
> > > > > > > >> > > > > > be to write a StreamCodec
implementation to
> > distribute
> > > > > data
> > > > > > > >> among
> > > > > > > >> > the
> > > > > > > >> > > > > > partitions such that
each partition is receiving
> > > similar
> > > > > > > amount
> > > > > > > >> of
> > > > > > > >> > > data
> > > > > > > >> > > > > to
> > > > > > > >> > > > > > process. We should consider
adding StreamCodecs
> like
> > > > these
> > > > > > to
> > > > > > > >> the
> > > > > > > >> > > > library
> > > > > > > >> > > > > > but these however do
not solve the problem when it
> > is
> > > > > > > >> environment
> > > > > > > >> > > > > related.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > For that a better and
more comprehensive approach
> > > would
> > > > be
> > > > > > > look
> > > > > > > >> at
> > > > > > > >> > > how
> > > > > > > >> > > > > data
> > > > > > > >> > > > > > is being consumed by
the downstream partitions
> from
> > > the
> > > > > > > >> > BufferServer
> > > > > > > >> > > > and
> > > > > > > >> > > > > > use that information
to make decisions on how to
> > send
> > > > > future
> > > > > > > >> data.
> > > > > > > >> > If
> > > > > > > >> > > > > some
> > > > > > > >> > > > > > partitions are behind
others in consuming data
> then
> > > data
> > > > > can
> > > > > > > be
> > > > > > > >> > > > directed
> > > > > > > >> > > > > to
> > > > > > > >> > > > > > the other partitions.
One way to do this would be
> to
> > > > relay
> > > > > > > this
> > > > > > > >> > type
> > > > > > > >> > > of
> > > > > > > >> > > > > > statistical and positional
information from
> > > BufferServer
> > > > > to
> > > > > > > the
> > > > > > > >> > > > upstream
> > > > > > > >> > > > > > publishers. The publishers
can use this
> information
> > in
> > > > > ways
> > > > > > > >> such as
> > > > > > > >> > > > > making
> > > > > > > >> > > > > > it available to StreamCodecs
to affect destination
> > of
> > > > > future
> > > > > > > >> data.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > What do you think.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Thanks
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message