apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Yan <da...@datatorrent.com>
Subject Re: Supporting iterations in Apex
Date Tue, 03 Nov 2015 23:44:41 GMT
Interesting idea about chaining multiple delay operators.
I think there's nothing that prevents us from supporting that scenario in
both options, except in the case for option 1, there needs be an extra
dedicated delay operator for each feedback loop.


On Tue, Nov 3, 2015 at 3:34 PM, Timothy Farkas <tim@datatorrent.com> wrote:

> +1 for option 2
>
> Also would it be possible to chain delay operators? A lot of stochastic and
> adaptive methods depend on finding the correlations between the current
> time step n and previous k time steps  (n, n - 1), (n, n - 2), (n, n - 3)
> ... (n, n - k)
>
> Here is a picture of a model that uses delay operators (in the picture
> these are represented by z^-1) and is used for time series prediction.
>
>
> https://upload.wikimedia.org/wikipedia/commons/thumb/d/d2/IIRFilter2.svg/250px-IIRFilter2.svg.png
>
> On Tue, Nov 3, 2015 at 3:14 PM, Sasha Parfenov <sasha@datatorrent.com>
> wrote:
>
> > +1 for option 2.  Although option 2 doesn't mirror the current unifiers
> > like option 1, and may look more complicated when viewing logical plan, I
> > think the benefits of flexibility of specifying locality and ability to
> > bring multiple downstream operators into a single delay operators may be
> > important for some projects.  For me the added flexibility wins,
> > particularly in light of efforts towards a simpler high level API.
> >
> >
> >
> > On Mon, Nov 2, 2015 at 11:25 AM, David Yan <david@datatorrent.com>
> wrote:
> >
> > > Please share your thoughts using the dev mailing list on this topic if
> > you
> > > can.  Thanks.
> > >
> > > ---------- Forwarded message ----------
> > > From: David Yan <david@datatorrent.com>
> > > Date: Thu, Oct 29, 2015 at 11:11 AM
> > > Subject: Re: Supporting iterations in Apex
> > > To: dev@apex.incubator.apache.org
> > >
> > >
> > > This delay operator will act as an input operator for the first window
> > and
> > > act as a regular operator after that.
> > > The engine will increment the window id of the windows from all the
> > output
> > > ports of the delay operator.
> > >
> > > We will need a new interface for the delay operator, extending the
> > > existing Operator interface.  The interface will probably include:
> > >
> > > - Emitting the tuples for the first window
> > > - Emitting the tuples after recovery
> > >
> > > We will provide a default implementation of the delay operator with a
> > > write-ahead log that stores the tuples for the window before each
> > > checkpoint for recovery.  We will also probably support the number of
> > > windows to delay using an operator property.
> > >
> > > Let's look at this DAG with an iteration loop:
> > >
> > > upstream --> A --> B --> downstream
> > >              ^     |
> > >              |-----|
> > >
> > > With the delay operator, the physical view of the DAG looks like this
> > with
> > > D being the delay operator:
> > >
> > > upstream --> A --> B --> downstream
> > >              ^     |
> > >              |-D<--|
> > >
> > > There are two approaches for specifying the delay operator.
> > >
> > > 1) As discussed earlier on this thread, the delay operator can be
> > > specified as an *input port attribute* of A. The delay operator D will
> > > not appear in the logical DAG.  The engine will do the +1 on the window
> > ID
> > > based on the presence of the input port attribute.  In this case, the
> > delay
> > > operator does not need to specify any input port, just like the
> unifier,
> > > with the process(tuple) method implicitly taking in the tuples from the
> > > output port of B, which logically connects to the input port of A.
> > >
> > > 2) The delay operator is specified and connected *as any other
> operator*
> > > in the logical DAG.  The engine will do the +1 on the window ID if the
> > > operator implements the delay operator interface.  In this case, the
> > delay
> > > operator D will need to specify at least one input port (just like a
> > > regular operator), and can actually have multiple input ports.
> > >
> > > I'm leaning toward the 2nd approach.
> > >
> > > Please share your thoughts.  Which one you think is better?  Or maybe
> > > suggest a different approach altogether?
> > >
> > > Thanks!
> > >
> > > David
> > >
> > > David
> > >
> > > On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <thomas@datatorrent.com>
> > > wrote:
> > >
> > >> Why not set the the delay operator as attribute? We already support
> > >> partitioners and stream codecs as attribute.
> > >>
> > >>
> > >> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <
> > pramod@datatorrent.com>
> > >> wrote:
> > >>
> > >> > How about making just the window delay an attribute on the input
> port.
> > >> The
> > >> > operator connection is just like a normal DAG stream creation. We
> > could
> > >> > also support connecting same operator to multiple input ports with
> > >> > different delay and handle fault recovery accordingly.
> > >> >
> > >> > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <david@datatorrent.com>
> > >> wrote:
> > >> >
> > >> > > The iteration operator actually resembles the usage of unifiers.
> We
> > >> have
> > >> > > getUnifier in the interface of OutputPort.
> > >> > >
> > >> > > But if we add getDelayOperator in the interface of InputPort,
that
> > >> would
> > >> > > introduce backward incompatibility especially since we can't
use
> the
> > >> > > default implementation feature of interfaces that is in Java
8.
> > >> > >
> > >> > > Putting the class object as an attribute of the InputPort is
not
> > good
> > >> > > either because you can't configure the delay operator itself.
> > >> > >
> > >> > > Thoughts?
> > >> > >
> > >> > > David
> > >> > >
> > >> > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <
> david@datatorrent.com>
> > >> > wrote:
> > >> > >
> > >> > > > This is a very good idea.  This way, we can have a default
> > >> > implementation
> > >> > > > of that operator and the user can control how the tuples
are
> > stored
> > >> by
> > >> > > > having his/her own implementation.  How many windows the
> operator
> > >> > delays
> > >> > > is
> > >> > > > part of the implementation of that operator.
> > >> > > >
> > >> > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
> > >> attribute
> > >> > and
> > >> > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user
can
> > >> specify
> > >> > > the
> > >> > > > delay operator class to be used.
> > >> > > >
> > >> > > > More thoughts?
> > >> > > >
> > >> > > > David
> > >> > > >
> > >> > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> > >> gaurav@datatorrent.com>
> > >> > > > wrote:
> > >> > > >
> > >> > > >> Hey David,
> > >> > > >>
> > >> > > >> I was thinking can we add another operator in front
of the
> input
> > >> port
> > >> > > that
> > >> > > >> has ITERATION_WINDOW_COUNT set. The new additional operator
> will
> > >> have
> > >> > > >> property whose value  will be set equal to
> ITERATION_WINDOW_COUNT
> > >> and
> > >> > it
> > >> > > >> will be responsible for caching the data for those many
windows
> > and
> > >> > > >> delaying the data. This operator can act as unifier
cum
> iterator
> > >> > > operator.
> > >> > > >> For this you may not need any external storage agent
as
> platform
> > >> > > >> checkpointing should help you here.
> > >> > > >>
> > >> > > >> We are doing something similar for Sliding window.
> > >> > > >>
> > >> > > >> Thanks
> > >> > > >> -Gaurav
> > >> > > >>
> > >> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <
> > david@datatorrent.com>
> > >> > > wrote:
> > >> > > >>
> > >> > > >> > Hi all,
> > >> > > >> >
> > >> > > >> > One current disadvantage of Apex is the inability
to do
> > >> iterations
> > >> > and
> > >> > > >> > machine learning algorithms because we don't allow
loops in
> the
> > >> > > >> application
> > >> > > >> > DAG (hence the name DAG).  I am proposing that
we allow loops
> > in
> > >> the
> > >> > > >> DAG if
> > >> > > >> > the loop advances the window ID by a configured
amount.  A
> JIRA
> > >> > ticket
> > >> > > >> has
> > >> > > >> > been created:
> > >> > > >> >
> > >> > > >> > https://malhar.atlassian.net/browse/APEX-60
> > >> > > >> >
> > >> > > >> > I have started this work in my fork at
> > >> > > >> >
> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60
> > .
> > >> > > >> >
> > >> > > >> > The current progress is that a simple test case
works.  Major
> > >> work
> > >> > > still
> > >> > > >> > needs to be done with respect to recovery and partitioning.
> > >> > > >> >
> > >> > > >> > The value ITERATION_WINDOW_COUNT is an attribute
to an input
> > >> port of
> > >> > > an
> > >> > > >> > operator.  If the value of the attribute is greater
than or
> > >> equal to
> > >> > > 1,
> > >> > > >> any
> > >> > > >> > tuples sent to the input port are treated to be
> > >> > ITERATION_WINDOW_COUNT
> > >> > > >> > windows ahead of what they are.
> > >> > > >> >
> > >> > > >> > For recovery, we will need to checkpoint all the
tuples
> between
> > >> > ports
> > >> > > >> with
> > >> > > >> > the to replay the looped tuples.  During the recovery,
if the
> > >> > operator
> > >> > > >> has
> > >> > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is
recovering
> > from
> > >> > > >> checkpoint
> > >> > > >> > window 14, the tuples for that input port from
window 13 and
> > >> window
> > >> > 14
> > >> > > >> need
> > >> > > >> > to be replayed to be treated as window 15 and window
16
> > >> respectively
> > >> > > >> (13+2
> > >> > > >> > and 14+2).
> > >> > > >> >
> > >> > > >> > In other words, we need to store all the tuples
from window
> > with
> > >> ID
> > >> > > >> > committedWindowId minus ITERATION_WINDOW_COUNT
for recovery
> and
> > >> > purge
> > >> > > >> the
> > >> > > >> > tuples earlier than that window.
> > >> > > >> > We can optimize this by only storing the tuples
for
> > >> > > >> ITERATION_WINDOW_COUNT
> > >> > > >> > windows prior to any checkpoint.
> > >> > > >> >
> > >> > > >> > For that, we need a storage mechanism for the tuples.
> Chandni
> > >> > already
> > >> > > >> has
> > >> > > >> > something that fits this usage case in Apex Malhar.
 The
> class
> > is
> > >> > > >> > IdempotentStorageManager.  In order for this to
be used in
> Apex
> > >> > core,
> > >> > > we
> > >> > > >> > need to deprecate the class in Apex Malhar and
move it to
> Apex
> > >> Core.
> > >> > > >> >
> > >> > > >> > A JIRA ticket has been created for this particular
work:
> > >> > > >> >
> > >> > > >> > https://malhar.atlassian.net/browse/APEX-128
> > >> > > >> >
> > >> > > >> > Some of the above has been discussed among Thomas,
Chetan,
> > >> Chandni,
> > >> > > and
> > >> > > >> > myself.
> > >> > > >> >
> > >> > > >> > For partitioning, we have not started any discussion
or
> > >> > brainstorming.
> > >> > > >> We
> > >> > > >> > appreciate any feedback on this and any other aspect
related
> to
> > >> > > >> supporting
> > >> > > >> > iterations in general.
> > >> > > >> >
> > >> > > >> > Thanks!
> > >> > > >> >
> > >> > > >> > David
> > >> > > >> >
> > >> > > >>
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message