apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vlad Rozov <v.ro...@datatorrent.com>
Subject Re: Supporting iterations in Apex
Date Tue, 03 Nov 2015 22:29:48 GMT
+1 to option #2 - the delay operator is an explicit operator in the DAG 
(with D and A dropped from the name after the feature is implemented :-) 
). After discussion with David I think it is more flexible option that 
can be simplified if necessary by using higher level API.

Thank you,

Vlad

On 10/29/15 11:11, David Yan wrote:
> This delay operator will act as an input operator for the first window and
> act as a regular operator after that.
> The engine will increment the window id of the windows from all the output
> ports of the delay operator.
>
> We will need a new interface for the delay operator, extending the existing
> Operator interface.  The interface will probably include:
>
> - Emitting the tuples for the first window
> - Emitting the tuples after recovery
>
> We will provide a default implementation of the delay operator with a
> write-ahead log that stores the tuples for the window before each
> checkpoint for recovery.  We will also probably support the number of
> windows to delay using an operator property.
>
> Let's look at this DAG with an iteration loop:
>
> upstream --> A --> B --> downstream
>               ^     |
>               |-----|
>
> With the delay operator, the physical view of the DAG looks like this with
> D being the delay operator:
>
> upstream --> A --> B --> downstream
>               ^     |
>               |-D<--|
>
> There are two approaches for specifying the delay operator.
>
> 1) As discussed earlier on this thread, the delay operator can be specified
> as an *input port attribute* of A. The delay operator D will not appear in
> the logical DAG.  The engine will do the +1 on the window ID based on the
> presence of the input port attribute.  In this case, the delay operator
> does not need to specify any input port, just like the unifier, with the
> process(tuple) method implicitly taking in the tuples from the output port
> of B, which logically connects to the input port of A.
>
> 2) The delay operator is specified and connected *as any other operator* in
> the logical DAG.  The engine will do the +1 on the window ID if the
> operator implements the delay operator interface.  In this case, the delay
> operator D will need to specify at least one input port (just like a
> regular operator), and can actually have multiple input ports.
>
> I'm leaning toward the 2nd approach.
>
> Please share your thoughts.  Which one you think is better?  Or maybe
> suggest a different approach altogether?
>
> Thanks!
>
> David
>
> David
>
> On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <thomas@datatorrent.com>
> wrote:
>
>> Why not set the the delay operator as attribute? We already support
>> partitioners and stream codecs as attribute.
>>
>>
>> On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <pramod@datatorrent.com>
>> wrote:
>>
>>> How about making just the window delay an attribute on the input port.
>> The
>>> operator connection is just like a normal DAG stream creation. We could
>>> also support connecting same operator to multiple input ports with
>>> different delay and handle fault recovery accordingly.
>>>
>>> On Wed, Oct 7, 2015 at 9:53 AM, David Yan <david@datatorrent.com> wrote:
>>>
>>>> The iteration operator actually resembles the usage of unifiers.  We
>> have
>>>> getUnifier in the interface of OutputPort.
>>>>
>>>> But if we add getDelayOperator in the interface of InputPort, that
>> would
>>>> introduce backward incompatibility especially since we can't use the
>>>> default implementation feature of interfaces that is in Java 8.
>>>>
>>>> Putting the class object as an attribute of the InputPort is not good
>>>> either because you can't configure the delay operator itself.
>>>>
>>>> Thoughts?
>>>>
>>>> David
>>>>
>>>> On Fri, Sep 25, 2015 at 10:10 AM, David Yan <david@datatorrent.com>
>>> wrote:
>>>>> This is a very good idea.  This way, we can have a default
>>> implementation
>>>>> of that operator and the user can control how the tuples are stored
>> by
>>>>> having his/her own implementation.  How many windows the operator
>>> delays
>>>> is
>>>>> part of the implementation of that operator.
>>>>>
>>>>> I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute
>>> and
>>>>> introduce a DELAY_OPERATOR_CLASS attribute so that the user can
>> specify
>>>> the
>>>>> delay operator class to be used.
>>>>>
>>>>> More thoughts?
>>>>>
>>>>> David
>>>>>
>>>>> On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <
>> gaurav@datatorrent.com>
>>>>> wrote:
>>>>>
>>>>>> Hey David,
>>>>>>
>>>>>> I was thinking can we add another operator in front of the input
>> port
>>>> that
>>>>>> has ITERATION_WINDOW_COUNT set. The new additional operator will
>> have
>>>>>> property whose value  will be set equal to ITERATION_WINDOW_COUNT
>> and
>>> it
>>>>>> will be responsible for caching the data for those many windows and
>>>>>> delaying the data. This operator can act as unifier cum iterator
>>>> operator.
>>>>>> For this you may not need any external storage agent as platform
>>>>>> checkpointing should help you here.
>>>>>>
>>>>>> We are doing something similar for Sliding window.
>>>>>>
>>>>>> Thanks
>>>>>> -Gaurav
>>>>>>
>>>>>> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <david@datatorrent.com>
>>>> wrote:
>>>>>>> Hi all,
>>>>>>>
>>>>>>> One current disadvantage of Apex is the inability to do iterations
>>> and
>>>>>>> machine learning algorithms because we don't allow loops in the
>>>>>> application
>>>>>>> DAG (hence the name DAG).  I am proposing that we allow loops
in
>> the
>>>>>> DAG if
>>>>>>> the loop advances the window ID by a configured amount.  A JIRA
>>> ticket
>>>>>> has
>>>>>>> been created:
>>>>>>>
>>>>>>> https://malhar.atlassian.net/browse/APEX-60
>>>>>>>
>>>>>>> I have started this work in my fork at
>>>>>>> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
>>>>>>>
>>>>>>> The current progress is that a simple test case works.  Major
work
>>>> still
>>>>>>> needs to be done with respect to recovery and partitioning.
>>>>>>>
>>>>>>> The value ITERATION_WINDOW_COUNT is an attribute to an input
port
>> of
>>>> an
>>>>>>> operator.  If the value of the attribute is greater than or equal
>> to
>>>> 1,
>>>>>> any
>>>>>>> tuples sent to the input port are treated to be
>>> ITERATION_WINDOW_COUNT
>>>>>>> windows ahead of what they are.
>>>>>>>
>>>>>>> For recovery, we will need to checkpoint all the tuples between
>>> ports
>>>>>> with
>>>>>>> the to replay the looped tuples.  During the recovery, if the
>>> operator
>>>>>> has
>>>>>>> an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
>>>>>> checkpoint
>>>>>>> window 14, the tuples for that input port from window 13 and
>> window
>>> 14
>>>>>> need
>>>>>>> to be replayed to be treated as window 15 and window 16
>> respectively
>>>>>> (13+2
>>>>>>> and 14+2).
>>>>>>>
>>>>>>> In other words, we need to store all the tuples from window with
>> ID
>>>>>>> committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
>>> purge
>>>>>> the
>>>>>>> tuples earlier than that window.
>>>>>>> We can optimize this by only storing the tuples for
>>>>>> ITERATION_WINDOW_COUNT
>>>>>>> windows prior to any checkpoint.
>>>>>>>
>>>>>>> For that, we need a storage mechanism for the tuples.  Chandni
>>> already
>>>>>> has
>>>>>>> something that fits this usage case in Apex Malhar.  The class
is
>>>>>>> IdempotentStorageManager.  In order for this to be used in Apex
>>> core,
>>>> we
>>>>>>> need to deprecate the class in Apex Malhar and move it to Apex
>> Core.
>>>>>>> A JIRA ticket has been created for this particular work:
>>>>>>>
>>>>>>> https://malhar.atlassian.net/browse/APEX-128
>>>>>>>
>>>>>>> Some of the above has been discussed among Thomas, Chetan,
>> Chandni,
>>>> and
>>>>>>> myself.
>>>>>>>
>>>>>>> For partitioning, we have not started any discussion or
>>> brainstorming.
>>>>>> We
>>>>>>> appreciate any feedback on this and any other aspect related
to
>>>>>> supporting
>>>>>>> iterations in general.
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> David
>>>>>>>
>>>>>


Mime
View raw message