apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Farkas <...@datatorrent.com>
Subject Re: load based stream partitioning
Date Fri, 12 Feb 2016 00:32:35 GMT
Comments inline

+1 Overall as well provided Apex-339 is implemented first and it is
documented that the mechanism should not be used with some stateful
operators.

On Thu, Feb 11, 2016 at 4:20 PM, Pramod Immaneni <pramod@datatorrent.com>
wrote:

> Comments inline
>
> On Thu, Feb 11, 2016 at 4:13 PM, Timothy Farkas <tim@datatorrent.com>
> wrote:
>
> > Hey Pramod,
> >
> > I agree if APEX-339 is in place then it would work without redeploying
> > containers for operators that are Stateless, or a subset of Stateful
> > operators.
> >
> > Addressing your previous questions.
> >
> > - The StatsListener can be used to see how far behind operators are. You
> > could determine what window the operator is on, or the number of tuples
> > it's processed so far, or how long
> > it takes it to complete a window.
> >
>
> What if tuples are different sizes and number of tuples processed doesn't
> reflect how far ahead or behind a downstream partition is? How is the
> information from StatsListener made available to the upstream partition
> codecs.
>
What is the information Buffer Server can provide that the StatsListener
cannot?

The StatsListener can trigger a repartition. The information in the
StatsListener can be shared
with the partitioner by setting the same object for both in populate Dag.
The partitioner can then
compute the new Stream Codec. The mechanism by which the upstream would be
updated with the new
Stream Codec would have to be implemented as it's currently not there.

>
>
> >
> > - Some examples of Stateful operators that require repartitioning of
> state
> > are the following:
> >       - Deduper
> >            In this case after updating the stream codec the operator may
> > allow a previously seen value to pass because the partition didn't
> receive
> > that value with the previous stream codec.
> >       - A key value store that holds aggregations for each key.
> >            In this case multiple partitions would hold partial
> aggregations
> > for a key, when they are expecting to hold the complete aggregation.
> >
>
> Agreed for deduper. For the second case a unifier is a better approach so
> that you are not affected by key skew in general.
>
This is not always possible. We can discuss this offline, since it won't
add much to the discussion here to go into the details.

>
>
> >
> > Tim
> >
> > On Thu, Feb 11, 2016 at 4:04 PM, Pramod Immaneni <pramod@datatorrent.com
> >
> > wrote:
> >
> > > Additionally it can be treated as a non-idempotent stream for recovery.
> > > Look at APEXCORE-339. In cases where the downstream partitions require
> > some
> > > key based partitioning, what you are suggesting would be a good
> approach
> > > but it will require more complex logic in the StreamCodec to both key
> and
> > > load based partitioning.
> > >
> > > On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni <
> pramod@datatorrent.com
> > >
> > > wrote:
> > >
> > > > How would you know how far behind partitions are without interacting
> > with
> > > > BufferServer like you were mentioning in the earlier email. Secondly
> > why
> > > > would changing where the data is sent to based mandate
> re-partitioning
> > if
> > > > the downstream partitions can handle data with different keys.
> > > >
> > > > On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas <tim@datatorrent.com
> >
> > > > wrote:
> > > >
> > > >> Hey Pramod,
> > > >>
> > > >> I think in general and for recovery the existing Partitioning
> > machinery
> > > >> can
> > > >> be reused to update the Stream Codec.
> > > >> The reason why is because If the operator is Stateful and changes
> are
> > > made
> > > >> to the Stream Codec, the state of the partitions will also have to
> be
> > > >> repartitioned.
> > > >> In this case the number of partitions will remain the same, just the
> > > state
> > > >> of the partitions is reshuffled. The implementation for this state
> > > >> reshuffling in a fault tolerant way is already handled by the
> Dynamic
> > > >> Partitioning logic, so it could be extended to update the Stream
> Codec
> > > as
> > > >> well.
> > > >>
> > > >> If the operator is Stateless, it may be possible to do without
> > > redeploying
> > > >> any containers. But with the way I am envisioning it, I think there
> > > would
> > > >> be a lot of difficult to handle corner cases for recovery.
> > > >>
> > > >> Tim
> > > >>
> > > >> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni <
> > > pramod@datatorrent.com>
> > > >> wrote:
> > > >>
> > > >> > Comment inline.
> > > >> >
> > > >> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy Farkas <
> > tim@datatorrent.com
> > > >
> > > >> > wrote:
> > > >> >
> > > >> > > +1 for the idea.
> > > >> > >
> > > >> > > Gaurav, this could be done idempotently in the same way
that
> > dynamic
> > > >> > > repartitioning is done idempotently. All the partitions
are
> rolled
> > > >> back
> > > >> > to
> > > >> > > a common checkpoint and the new StreamCodec is applied starting
> > > then.
> > > >> The
> > > >> > > statistics that the Stream Codec are given are the statistics
> for
> > > the
> > > >> > > windows computed before the common checkpoint that the
> partitions
> > > are
> > > >> > > rolled back to.
> > > >> > >
> > > >> > > In fact I think this feature could be added easily by avoiding
> > > buffer
> > > >> > > server entirely and by allowing the Partitioner to redefine
the
> > > >> > StreamCodec
> > > >> > > for the operator when define partitions is called.
> > > >> > >
> > > >> >
> > > >> > Are you saying this in context of recovery or in general?
> > > >> >
> > > >> >
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Tim
> > > >> > >
> > > >> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol Kekre <
> > amol@datatorrent.com>
> > > >> > wrote:
> > > >> > >
> > > >> > > > Gaurav,
> > > >> > > > It would not be idempotent per partition, but will
be across
> all
> > > >> > > partitions
> > > >> > > > combined. In this case the user would have explicitly
asked
> for
> > > >> such a
> > > >> > > > pattern.
> > > >> > > >
> > > >> > > > Thks,
> > > >> > > > Amol
> > > >> > > >
> > > >> > > >
> > > >> > > > On Thu, Feb 11, 2016 at 12:04 PM, Gaurav Gupta <
> > > >> > gaurav.gopi123@gmail.com
> > > >> > > >
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > > > Pramod,
> > > >> > > > >
> > > >> > > > > How would it work with recovery? There could be
cases where
> a
> > > >> tuple
> > > >> > > went
> > > >> > > > to
> > > >> > > > > P1 and post recovery it can go to P2
> > > >> > > > >
> > > >> > > > > Gaurav
> > > >> > > > >
> > > >> > > > > On Thu, Feb 11, 2016 at 11:56 AM, Pramod Immaneni
<
> > > >> > > > pramod@datatorrent.com>
> > > >> > > > > wrote:
> > > >> > > > >
> > > >> > > > > > Hi,
> > > >> > > > > >
> > > >> > > > > > There are scenarios where the downstream
partitions of an
> > > >> upstream
> > > >> > > > > operator
> > > >> > > > > > are generally not performing uniformly resulting
in an
> > overall
> > > >> > > > > sub-optimal
> > > >> > > > > > performance dictated by the slowest partitions.
The
> reasons
> > > >> could
> > > >> > be
> > > >> > > > data
> > > >> > > > > > related such as some partitions are receiving
more data to
> > > >> process
> > > >> > > than
> > > >> > > > > the
> > > >> > > > > > others or could be environment related such
as some
> > partitions
> > > >> are
> > > >> > > > > running
> > > >> > > > > > slower than others because they are on heavily
loaded
> nodes.
> > > >> > > > > >
> > > >> > > > > > A solution based on currently available functionality
in
> the
> > > >> engine
> > > >> > > > would
> > > >> > > > > > be to write a StreamCodec implementation
to distribute
> data
> > > >> among
> > > >> > the
> > > >> > > > > > partitions such that each partition is receiving
similar
> > > amount
> > > >> of
> > > >> > > data
> > > >> > > > > to
> > > >> > > > > > process. We should consider adding StreamCodecs
like these
> > to
> > > >> the
> > > >> > > > library
> > > >> > > > > > but these however do not solve the problem
when it is
> > > >> environment
> > > >> > > > > related.
> > > >> > > > > >
> > > >> > > > > > For that a better and more comprehensive
approach would be
> > > look
> > > >> at
> > > >> > > how
> > > >> > > > > data
> > > >> > > > > > is being consumed by the downstream partitions
from the
> > > >> > BufferServer
> > > >> > > > and
> > > >> > > > > > use that information to make decisions on
how to send
> future
> > > >> data.
> > > >> > If
> > > >> > > > > some
> > > >> > > > > > partitions are behind others in consuming
data then data
> can
> > > be
> > > >> > > > directed
> > > >> > > > > to
> > > >> > > > > > the other partitions. One way to do this
would be to relay
> > > this
> > > >> > type
> > > >> > > of
> > > >> > > > > > statistical and positional information from
BufferServer
> to
> > > the
> > > >> > > > upstream
> > > >> > > > > > publishers. The publishers can use this information
in
> ways
> > > >> such as
> > > >> > > > > making
> > > >> > > > > > it available to StreamCodecs to affect destination
of
> future
> > > >> data.
> > > >> > > > > >
> > > >> > > > > > What do you think.
> > > >> > > > > >
> > > >> > > > > > Thanks
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message