apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amol Kekre <a...@datatorrent.com>
Subject Re: Supporting iterations in Apex
Date Tue, 03 Nov 2015 23:50:19 GMT
I just had a talk with David, and I am +1 for option 2, with delay as an
explicit operator.

Thks,
Amol


On Tue, Nov 3, 2015 at 3:34 PM, Timothy Farkas <tim@datatorrent.com> wrote:

> +1 for option 2
>
> Also would it be possible to chain delay operators? A lot of stochastic and
> adaptive methods depend on finding the correlations between the current
> time step n and previous k time steps  (n, n - 1), (n, n - 2), (n, n - 3)
> ... (n, n - k)
>
> Here is a picture of a model that uses delay operators (in the picture
> these are represented by z^-1) and is used for time series prediction.
>
>
> https://upload.wikimedia.org/wikipedia/commons/thumb/d/d2/IIRFilter2.svg/250px-IIRFilter2.svg.png
>
> On Tue, Nov 3, 2015 at 3:14 PM, Sasha Parfenov <sasha@datatorrent.com>
> wrote:
>
> > +1 for option 2.  Although option 2 doesn't mirror the current unifiers
> > like option 1, and may look more complicated when viewing logical plan, I
> > think the benefits of flexibility of specifying locality and ability to
> > bring multiple downstream operators into a single delay operators may be
> > important for some projects.  For me the added flexibility wins,
> > particularly in light of efforts towards a simpler high level API.
> >
> >
> >
> > On Mon, Nov 2, 2015 at 11:25 AM, David Yan <david@datatorrent.com>
> wrote:
> >
> > > Please share your thoughts using the dev mailing list on this topic if
> > you
> > > can.  Thanks.
> > >
> > > ---------- Forwarded message ----------
> > > From: David Yan <david@datatorrent.com>
> > > Date: Thu, Oct 29, 2015 at 11:11 AM
> > > Subject: Re: Supporting iterations in Apex
> > > To: dev@apex.incubator.apache.org
> > >
> > >
> > > This delay operator will act as an input operator for the first window
> > and
> > > act as a regular operator after that.
> > > The engine will increment the window id of the windows from all the
> > output
> > > ports of the delay operator.
> > >
> > > We will need a new interface for the delay operator, extending the
> > > existing Operator interface.  The interface will probably include:
> > >
> > > - Emitting the tuples for the first window
> > > - Emitting the tuples after recovery
> > >
> > > We will provide a default implementation of the delay operator with a
> > > write-ahead log that stores the tuples for the window before each
> > > checkpoint for recovery.  We will also probably support the number of
> > > windows to delay using an operator property.
> > >
> > > Let's look at this DAG with an iteration loop:
> > >
> > > upstream --> A --> B --> downstream
> > >              ^     |
> > >              |-----|
> > >
> > > With the delay operator, the physical view of the DAG looks like this
> > with
> > > D being the delay operator:
> > >
> > > upstream --> A --> B --> downstream
> > >              ^     |
> > >              |-D<--|
> > >
> > > There are two approaches for specifying the delay operator.
> > >
> > > 1) As discussed earlier on this thread, the delay operator can be
> > > specified as an *input port attribute* of A. The delay operator D will
> > > not appear in the logical DAG.  The engine will do the +1 on the window
> > ID
> > > based on the presence of the input port attribute.  In this case, the
> > delay
> > > operator does not need to specify any input port, just like the
> unifier,
> > > with the process(tuple) method implicitly taking in the tuples from the
> > > output port of B, which logically connects to the input port of A.
> > >
> > > 2) The delay operator is specified and connected *as any other
> operator*
> > > in the logical DAG.  The engine will do the +1 on the window ID if the
> > > operator implements the delay operator interface.  In this case, the
> > delay
> > > operator D will need to specify at least one input port (just like a
> > > regular operator), and can actually have multiple input ports.
> > >
> > > I'm leaning toward the 2nd approach.
> > >
> > > Please share your thoughts.  Which one you think is better?  Or maybe
> > > suggest a different approach altogether?
> > >
> > > Thanks!
> > >
> > > David
> > >
> > > David
> > >
> > > On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <thomas@datatorrent.com>
> > > wrote:
> > >
> > >> Why not set the the delay operator as attribute? We already support
> > >> partitioners and stream codecs as attribute.
> > >>
> > >>
> > >> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <
> > pramod@datatorrent.com>
> > >> wrote:
> > >>
> > >> > How about making just the window delay an attribute on the input
> port.
> > >> The
> > >> > operator connection is just like a normal DAG stream creation. We
> > could
> > >> > also support connecting same operator to multiple input ports with
> > >> > different delay and handle fault recovery accordingly.
> > >> >
> > >> > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <david@datatorrent.com>
> > >> wrote:
> > >> >
> > >> > > The iteration operator actually resembles the usage of unifiers.
> We
> > >> have
> > >> > > getUnifier in the interface of OutputPort.
> > >> > >
> > >> > > But if we add getDelayOperator in the interface of InputPort,
that
> > >> would
> > >> > > introduce backward incompatibility especially since we can't
use
> the
> > >> > > default implementation feature of interfaces that is in Java
8.
> > >> > >
> > >> > > Putting the class object as an attribute of the InputPort is
not
> > good
> > >> > > either because you can't configure the delay operator itself.
> > >> > >
> > >> > > Thoughts?
> > >> > >
> > >> > > David
> > >> > >
> > >> > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <
> david@datatorrent.com>
> > >> > wrote:
> > >> > >
> > >> > > > This is a very good idea.  This way, we can have a default
> > >> > implementation
> > >> > > > of that operator and the user can control how the tuples
are
> > stored
> > >> by
> > >> > > > having his/her own implementation.  How many windows the
> operator
> > >> > delays
> > >> > > is
> > >> > > > part of the implementation of that operator.
> > >> > > >
> > >> > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET
> > >> attribute
> > >> > and
> > >> > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user
can
> > >> specify
> > >> > > the
> > >> > > > delay operator class to be used.
> > >> > > >
> > >> > > > More thoughts?
> > >> > > >
> > >> > > > David
> > >> > > >
> > >> > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
> > >> gaurav@datatorrent.com>
> > >> > > > wrote:
> > >> > > >
> > >> > > >> Hey David,
> > >> > > >>
> > >> > > >> I was thinking can we add another operator in front
of the
> input
> > >> port
> > >> > > that
> > >> > > >> has ITERATION_WINDOW_COUNT set. The new additional operator
> will
> > >> have
> > >> > > >> property whose value  will be set equal to
> ITERATION_WINDOW_COUNT
> > >> and
> > >> > it
> > >> > > >> will be responsible for caching the data for those many
windows
> > and
> > >> > > >> delaying the data. This operator can act as unifier
cum
> iterator
> > >> > > operator.
> > >> > > >> For this you may not need any external storage agent
as
> platform
> > >> > > >> checkpointing should help you here.
> > >> > > >>
> > >> > > >> We are doing something similar for Sliding window.
> > >> > > >>
> > >> > > >> Thanks
> > >> > > >> -Gaurav
> > >> > > >>
> > >> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <
> > david@datatorrent.com>
> > >> > > wrote:
> > >> > > >>
> > >> > > >> > Hi all,
> > >> > > >> >
> > >> > > >> > One current disadvantage of Apex is the inability
to do
> > >> iterations
> > >> > and
> > >> > > >> > machine learning algorithms because we don't allow
loops in
> the
> > >> > > >> application
> > >> > > >> > DAG (hence the name DAG).  I am proposing that
we allow loops
> > in
> > >> the
> > >> > > >> DAG if
> > >> > > >> > the loop advances the window ID by a configured
amount.  A
> JIRA
> > >> > ticket
> > >> > > >> has
> > >> > > >> > been created:
> > >> > > >> >
> > >> > > >> > https://malhar.atlassian.net/browse/APEX-60
> > >> > > >> >
> > >> > > >> > I have started this work in my fork at
> > >> > > >> >
> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60
> > .
> > >> > > >> >
> > >> > > >> > The current progress is that a simple test case
works.  Major
> > >> work
> > >> > > still
> > >> > > >> > needs to be done with respect to recovery and partitioning.
> > >> > > >> >
> > >> > > >> > The value ITERATION_WINDOW_COUNT is an attribute
to an input
> > >> port of
> > >> > > an
> > >> > > >> > operator.  If the value of the attribute is greater
than or
> > >> equal to
> > >> > > 1,
> > >> > > >> any
> > >> > > >> > tuples sent to the input port are treated to be
> > >> > ITERATION_WINDOW_COUNT
> > >> > > >> > windows ahead of what they are.
> > >> > > >> >
> > >> > > >> > For recovery, we will need to checkpoint all the
tuples
> between
> > >> > ports
> > >> > > >> with
> > >> > > >> > the to replay the looped tuples.  During the recovery,
if the
> > >> > operator
> > >> > > >> has
> > >> > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is
recovering
> > from
> > >> > > >> checkpoint
> > >> > > >> > window 14, the tuples for that input port from
window 13 and
> > >> window
> > >> > 14
> > >> > > >> need
> > >> > > >> > to be replayed to be treated as window 15 and window
16
> > >> respectively
> > >> > > >> (13+2
> > >> > > >> > and 14+2).
> > >> > > >> >
> > >> > > >> > In other words, we need to store all the tuples
from window
> > with
> > >> ID
> > >> > > >> > committedWindowId minus ITERATION_WINDOW_COUNT
for recovery
> and
> > >> > purge
> > >> > > >> the
> > >> > > >> > tuples earlier than that window.
> > >> > > >> > We can optimize this by only storing the tuples
for
> > >> > > >> ITERATION_WINDOW_COUNT
> > >> > > >> > windows prior to any checkpoint.
> > >> > > >> >
> > >> > > >> > For that, we need a storage mechanism for the tuples.
> Chandni
> > >> > already
> > >> > > >> has
> > >> > > >> > something that fits this usage case in Apex Malhar.
 The
> class
> > is
> > >> > > >> > IdempotentStorageManager.  In order for this to
be used in
> Apex
> > >> > core,
> > >> > > we
> > >> > > >> > need to deprecate the class in Apex Malhar and
move it to
> Apex
> > >> Core.
> > >> > > >> >
> > >> > > >> > A JIRA ticket has been created for this particular
work:
> > >> > > >> >
> > >> > > >> > https://malhar.atlassian.net/browse/APEX-128
> > >> > > >> >
> > >> > > >> > Some of the above has been discussed among Thomas,
Chetan,
> > >> Chandni,
> > >> > > and
> > >> > > >> > myself.
> > >> > > >> >
> > >> > > >> > For partitioning, we have not started any discussion
or
> > >> > brainstorming.
> > >> > > >> We
> > >> > > >> > appreciate any feedback on this and any other aspect
related
> to
> > >> > > >> supporting
> > >> > > >> > iterations in general.
> > >> > > >> >
> > >> > > >> > Thanks!
> > >> > > >> >
> > >> > > >> > David
> > >> > > >> >
> > >> > > >>
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message