apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pramod Immaneni <pra...@datatorrent.com>
Subject Re: load based stream partitioning
Date Fri, 12 Feb 2016 01:46:41 GMT
Inline

On Thu, Feb 11, 2016 at 4:32 PM, Timothy Farkas <tim@datatorrent.com> wrote:

> Comments inline
>
> +1 Overall as well provided Apex-339 is implemented first and it is
> documented that the mechanism should not be used with some stateful
> operators.
>
> On Thu, Feb 11, 2016 at 4:20 PM, Pramod Immaneni <pramod@datatorrent.com>
> wrote:
>
> > Comments inline
> >
> > On Thu, Feb 11, 2016 at 4:13 PM, Timothy Farkas <tim@datatorrent.com>
> > wrote:
> >
> > > Hey Pramod,
> > >
> > > I agree if APEX-339 is in place then it would work without redeploying
> > > containers for operators that are Stateless, or a subset of Stateful
> > > operators.
> > >
> > > Addressing your previous questions.
> > >
> > > - The StatsListener can be used to see how far behind operators are.
> You
> > > could determine what window the operator is on, or the number of tuples
> > > it's processed so far, or how long
> > > it takes it to complete a window.
> > >
> >
> > What if tuples are different sizes and number of tuples processed doesn't
> > reflect how far ahead or behind a downstream partition is? How is the
> > information from StatsListener made available to the upstream partition
> > codecs.
> >
> What is the information Buffer Server can provide that the StatsListener
> cannot?
>

The stats information would have to be relayed down to the upstream
operators. It's possible.


>
> The StatsListener can trigger a repartition. The information in the
> StatsListener can be shared
> with the partitioner by setting the same object for both in populate Dag.
> The partitioner can then
> compute the new Stream Codec. The mechanism by which the upstream would be
> updated with the new
> Stream Codec would have to be implemented as it's currently not there.
>
> >
> >
> > >
> > > - Some examples of Stateful operators that require repartitioning of
> > state
> > > are the following:
> > >       - Deduper
> > >            In this case after updating the stream codec the operator
> may
> > > allow a previously seen value to pass because the partition didn't
> > receive
> > > that value with the previous stream codec.
> > >       - A key value store that holds aggregations for each key.
> > >            In this case multiple partitions would hold partial
> > aggregations
> > > for a key, when they are expecting to hold the complete aggregation.
> > >
> >
> > Agreed for deduper. For the second case a unifier is a better approach so
> > that you are not affected by key skew in general.
> >
> This is not always possible. We can discuss this offline, since it won't
> add much to the discussion here to go into the details.
>
>
Yes not always.


> >
> >
> > >
> > > Tim
> > >
> > > On Thu, Feb 11, 2016 at 4:04 PM, Pramod Immaneni <
> pramod@datatorrent.com
> > >
> > > wrote:
> > >
> > > > Additionally it can be treated as a non-idempotent stream for
> recovery.
> > > > Look at APEXCORE-339. In cases where the downstream partitions
> require
> > > some
> > > > key based partitioning, what you are suggesting would be a good
> > approach
> > > > but it will require more complex logic in the StreamCodec to both key
> > and
> > > > load based partitioning.
> > > >
> > > > On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni <
> > pramod@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > How would you know how far behind partitions are without
> interacting
> > > with
> > > > > BufferServer like you were mentioning in the earlier email.
> Secondly
> > > why
> > > > > would changing where the data is sent to based mandate
> > re-partitioning
> > > if
> > > > > the downstream partitions can handle data with different keys.
> > > > >
> > > > > On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas <
> tim@datatorrent.com
> > >
> > > > > wrote:
> > > > >
> > > > >> Hey Pramod,
> > > > >>
> > > > >> I think in general and for recovery the existing Partitioning
> > > machinery
> > > > >> can
> > > > >> be reused to update the Stream Codec.
> > > > >> The reason why is because If the operator is Stateful and changes
> > are
> > > > made
> > > > >> to the Stream Codec, the state of the partitions will also have
to
> > be
> > > > >> repartitioned.
> > > > >> In this case the number of partitions will remain the same, just
> the
> > > > state
> > > > >> of the partitions is reshuffled. The implementation for this
state
> > > > >> reshuffling in a fault tolerant way is already handled by the
> > Dynamic
> > > > >> Partitioning logic, so it could be extended to update the Stream
> > Codec
> > > > as
> > > > >> well.
> > > > >>
> > > > >> If the operator is Stateless, it may be possible to do without
> > > > redeploying
> > > > >> any containers. But with the way I am envisioning it, I think
> there
> > > > would
> > > > >> be a lot of difficult to handle corner cases for recovery.
> > > > >>
> > > > >> Tim
> > > > >>
> > > > >> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni <
> > > > pramod@datatorrent.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Comment inline.
> > > > >> >
> > > > >> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy Farkas <
> > > tim@datatorrent.com
> > > > >
> > > > >> > wrote:
> > > > >> >
> > > > >> > > +1 for the idea.
> > > > >> > >
> > > > >> > > Gaurav, this could be done idempotently in the same
way that
> > > dynamic
> > > > >> > > repartitioning is done idempotently. All the partitions
are
> > rolled
> > > > >> back
> > > > >> > to
> > > > >> > > a common checkpoint and the new StreamCodec is applied
> starting
> > > > then.
> > > > >> The
> > > > >> > > statistics that the Stream Codec are given are the
statistics
> > for
> > > > the
> > > > >> > > windows computed before the common checkpoint that
the
> > partitions
> > > > are
> > > > >> > > rolled back to.
> > > > >> > >
> > > > >> > > In fact I think this feature could be added easily
by avoiding
> > > > buffer
> > > > >> > > server entirely and by allowing the Partitioner to
redefine
> the
> > > > >> > StreamCodec
> > > > >> > > for the operator when define partitions is called.
> > > > >> > >
> > > > >> >
> > > > >> > Are you saying this in context of recovery or in general?
> > > > >> >
> > > > >> >
> > > > >> > >
> > > > >> > > Thanks,
> > > > >> > > Tim
> > > > >> > >
> > > > >> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol Kekre <
> > > amol@datatorrent.com>
> > > > >> > wrote:
> > > > >> > >
> > > > >> > > > Gaurav,
> > > > >> > > > It would not be idempotent per partition, but
will be across
> > all
> > > > >> > > partitions
> > > > >> > > > combined. In this case the user would have explicitly
asked
> > for
> > > > >> such a
> > > > >> > > > pattern.
> > > > >> > > >
> > > > >> > > > Thks,
> > > > >> > > > Amol
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > On Thu, Feb 11, 2016 at 12:04 PM, Gaurav Gupta
<
> > > > >> > gaurav.gopi123@gmail.com
> > > > >> > > >
> > > > >> > > > wrote:
> > > > >> > > >
> > > > >> > > > > Pramod,
> > > > >> > > > >
> > > > >> > > > > How would it work with recovery? There could
be cases
> where
> > a
> > > > >> tuple
> > > > >> > > went
> > > > >> > > > to
> > > > >> > > > > P1 and post recovery it can go to P2
> > > > >> > > > >
> > > > >> > > > > Gaurav
> > > > >> > > > >
> > > > >> > > > > On Thu, Feb 11, 2016 at 11:56 AM, Pramod
Immaneni <
> > > > >> > > > pramod@datatorrent.com>
> > > > >> > > > > wrote:
> > > > >> > > > >
> > > > >> > > > > > Hi,
> > > > >> > > > > >
> > > > >> > > > > > There are scenarios where the downstream
partitions of
> an
> > > > >> upstream
> > > > >> > > > > operator
> > > > >> > > > > > are generally not performing uniformly
resulting in an
> > > overall
> > > > >> > > > > sub-optimal
> > > > >> > > > > > performance dictated by the slowest
partitions. The
> > reasons
> > > > >> could
> > > > >> > be
> > > > >> > > > data
> > > > >> > > > > > related such as some partitions are
receiving more data
> to
> > > > >> process
> > > > >> > > than
> > > > >> > > > > the
> > > > >> > > > > > others or could be environment related
such as some
> > > partitions
> > > > >> are
> > > > >> > > > > running
> > > > >> > > > > > slower than others because they are
on heavily loaded
> > nodes.
> > > > >> > > > > >
> > > > >> > > > > > A solution based on currently available
functionality in
> > the
> > > > >> engine
> > > > >> > > > would
> > > > >> > > > > > be to write a StreamCodec implementation
to distribute
> > data
> > > > >> among
> > > > >> > the
> > > > >> > > > > > partitions such that each partition
is receiving similar
> > > > amount
> > > > >> of
> > > > >> > > data
> > > > >> > > > > to
> > > > >> > > > > > process. We should consider adding StreamCodecs
like
> these
> > > to
> > > > >> the
> > > > >> > > > library
> > > > >> > > > > > but these however do not solve the problem
when it is
> > > > >> environment
> > > > >> > > > > related.
> > > > >> > > > > >
> > > > >> > > > > > For that a better and more comprehensive
approach would
> be
> > > > look
> > > > >> at
> > > > >> > > how
> > > > >> > > > > data
> > > > >> > > > > > is being consumed by the downstream
partitions from the
> > > > >> > BufferServer
> > > > >> > > > and
> > > > >> > > > > > use that information to make decisions
on how to send
> > future
> > > > >> data.
> > > > >> > If
> > > > >> > > > > some
> > > > >> > > > > > partitions are behind others in consuming
data then data
> > can
> > > > be
> > > > >> > > > directed
> > > > >> > > > > to
> > > > >> > > > > > the other partitions. One way to do
this would be to
> relay
> > > > this
> > > > >> > type
> > > > >> > > of
> > > > >> > > > > > statistical and positional information
from BufferServer
> > to
> > > > the
> > > > >> > > > upstream
> > > > >> > > > > > publishers. The publishers can use this
information in
> > ways
> > > > >> such as
> > > > >> > > > > making
> > > > >> > > > > > it available to StreamCodecs to affect
destination of
> > future
> > > > >> data.
> > > > >> > > > > >
> > > > >> > > > > > What do you think.
> > > > >> > > > > >
> > > > >> > > > > > Thanks
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message