apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tushar Gosavi <tus...@datatorrent.com>
Subject Re: APEXCORE-619 Recovery windowId in future during application relaunch.
Date Tue, 07 Mar 2017 08:22:40 GMT
Thank you all for the feedback.

Some of the useful output operator can be stateless, they push data
received in a window to output store. for example
KafkaOutputOperator/JDBCOutputOperator, or the output stores where
writes are idempotent, which covers most of the key-value stores.

I was going to use the existing logic to compute the committedWindowId with
addition of few steps explained below.
solution-1
- Calculate committedWindow with leaf operator checkpoints set to current
timestamp (current behaviour)
- Update leaf operators recoveryWindowId to committedWindowId
- Calculate committedWindow again, this steps is required because as
downstream operator recoveryWindowId is reduced and hence we may have to
adjust the recoveryWindowId of upstream operators.

This will prevent leaf stateless opeartors to start from current timestamp,
hence reducing amount of data loss. But As per the concern raised by
Bhupesh about last stateless operator being slow, the solution suggested by
Vlad is sufficient

solution-1
- as explained above. If little loss is expected we could go with this
appraoch.
solution-2
- Fail validation if last operator is stateless in AT_LEAST_ONCE scenario
as suggested by Vlad.
  This could break backward compatibility as old applications will fail to
launch.
solution-3
- Mark last operator stateful in AT_LEAST_ONCE scenario.

Let me know about your preference.

Regards,
- Tushar.


On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v.rozov@datatorrent.com> wrote:

> For a long chain of stateless operators at the end of a DAG, it is
> possible that time to propagate the end window to a leaf operator is
> greater than the time for a checkpoint to be persisted in HDFS.
>
> If at least once processing guarantee is necessary, the leaf operators
> should not be STATELESS. Will invalidating DAG that has one or more leaf
> operator marked as STATELESS with AT_LEAST_ONCE processing solve
> APEXCORE-619? It is not the best solution, but I think it is sufficient for
> the described scenario.
>
> Thank you,
>
> Vlad
>
>
> On 3/2/17 08:43, Thomas Weise wrote:
>
>> Good point, that's correct for a stateless leaf operator (operator that
>> does not have downstream operators). The minimum of upstream checkpoints
>> can be higher than the last windowId seen by the leaf operator. Although
>> that is a low probability, because it would mean the time it took for the
>> checkpoint to become visible in HDFS is less than propagation of endWindow
>> downstream.
>>
>> It's also not a problem for an intermediate stateless operator, because
>> the
>> downstream checkpoint will inform the recovery windowId. Most of the time
>> stateless operators are intermediate.
>>
>> Leaf operators are the output operators. I suspect in the original
>> scenario
>> is was a console output operator? Useful output operators usually won't be
>> stateless, they have to track state to interact with the external system
>> correctly. I'm bringing this up for adequate cost/benefit analysis.
>>
>> In absence of stateful downstream operator, you only have the committed
>> windowId, which is essentially a checkpointing watermark. On application
>> restart it has to be recomputed from the checkpoints available, and does
>> not cover the scenario Tushar reported originally.
>>
>> Saving committed windowId comes at a cost, it would have to be written to
>> the journal before operators are notified. Care has been taken to no write
>> unnecessarily to the journal, as it is blocking I/O and in this case the
>> frequency depends on the order of arrival of checkpoint notifications from
>> operators. We also don't want to delay commitedWindow notification, as
>> that
>> would introduce latency.
>>
>> Thomas
>>
>>
>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <bhupesh@datatorrent.com>
>> wrote:
>>
>> What if all operators complete first checkpoints but the stateless
>>> operator
>>> could not cross the first checkpoint window, and the DAG crashed.
>>> If we try to figure out the recovery checkpoint now, we might conclude
>>> that
>>> checkpoint 1 is the point to start and we may miss some data getting
>>> processed by the stateless operator. Probably in this case at-least once
>>> is
>>> also not guaranteed?
>>>
>>> ~ Bhupesh
>>>
>>>
>>> _______________________________________________________
>>>
>>> Bhupesh Chawda
>>>
>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>
>>> www.datatorrent.com  |  apex.apache.org
>>>
>>>
>>>
>>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <thw@apache.org> wrote:
>>>
>>> Dummy checkpoints, continuously writing committed window id and the like
>>>> all introduce overhead that is probably not needed.
>>>>
>>>> All the information to derive what we need is likely available and IMO
>>>>
>>> the
>>>
>>>> discussion should be on what is the correct way of using it. I will have
>>>>
>>> a
>>>
>>>> look when I get to it as well.
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <sandesh@datatorrent.com>
>>>> wrote:
>>>>
>>>> Instead of treating the stateless operator in a special way and missing
>>>>> corner cases, just have a dummy checkpoint, then there is no need to
>>>>>
>>>> handle
>>>>
>>>>> corner cases.
>>>>>
>>>>> There is a name for this solution,
>>>>> https://en.wikipedia.org/wiki/Null_Object_pattern
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pramod@datatorrent.com
>>>>> wrote:
>>>>>
>>>>> There is code in various places that deals with stateless operators
>>>>>>
>>>>> in
>>>
>>>> a
>>>>
>>>>> special way even though a physical checkpoint does not exist on the
>>>>>>
>>>>> disk.
>>>>
>>>>> It is probably a matter of applying similar thought process/logic
>>>>>>
>>>>> correctly
>>>>>
>>>>>> here.
>>>>>>
>>>>>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <amol@datatorrent.com>
>>>>>>
>>>>> wrote:
>>>>
>>>>> hmm! the fact that commitWindowId has moved up (right now in memory
>>>>>>>
>>>>>> of
>>>>
>>>>> Stram) should mean that a complete set of checkpoints are
>>>>>>>
>>>>>> available,
>>>
>>>> i.e
>>>>>
>>>>>> commitWindowId can be derived. Lets say that next checkpoint window
>>>>>>>
>>>>>> also
>>>>>
>>>>>> gets checkpointed across the app, commitwindowID is in memory but
>>>>>>>
>>>>>> not
>>>
>>>> written to stram-state yet, then upon relaunch the latest
>>>>>>>
>>>>>> commitwindowID
>>>>>
>>>>>> should get computed correctly.
>>>>>>>
>>>>>>> This may be just about setting stateless operators to
>>>>>>>
>>>>>> commitWindowid
>>>
>>>> on
>>>>
>>>>> re-launch? aka bug/feature?
>>>>>>>
>>>>>>> Thks
>>>>>>> Amol
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606>
|
>>>>>>>
>>>>>> Twitter:
>>>>
>>>>> @*amolhkekre*
>>>>>>
>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>
>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>>
>>>>>>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
>>>>>>>
>>>>>> pramod@datatorrent.com>
>>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>> Do we need to save committedWindowId? Can't it be computed from
>>>>>>>>
>>>>>>> existing
>>>>>>
>>>>>>> checkpoints by walking through the DAG. We probably do this
>>>>>>>>
>>>>>>> anyway
>>>
>>>> and
>>>>>
>>>>>> I
>>>>>>
>>>>>>> suspect there is a minor bug somewhere in there. If an operator
>>>>>>>>
>>>>>>> is
>>>
>>>> stateless you could assume checkpoint as long max for sake of
>>>>>>>>
>>>>>>> computation
>>>>>>
>>>>>>> and compute the committed window to be the lowest common
>>>>>>>>
>>>>>>> checkpoint.
>>>>
>>>>> If
>>>>>
>>>>>> they are all stateless and you end up with long max you can start
>>>>>>>>
>>>>>>> with
>>>>>
>>>>>> window id that reflects the current timestamp.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <amol@datatorrent.com
>>>>>>>>
>>>>>>> wrote:
>>>>>>
>>>>>>> CommitWindowId could be computed from the existing checkpoints.
>>>>>>>>>
>>>>>>>> That
>>>>>
>>>>>> solution still needs purge to be done after commitWindowId is
>>>>>>>>>
>>>>>>>> confirmed
>>>>>>
>>>>>>> to
>>>>>>>>
>>>>>>>>> be saved in Stram state. Without ths the commitWindowId
>>>>>>>>>
>>>>>>>> computed
>>>
>>>> from
>>>>>
>>>>>> the
>>>>>>>
>>>>>>>> checkpoints may have some checkpoints missing.
>>>>>>>>>
>>>>>>>>> Thks
>>>>>>>>> Amol
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606>
|
>>>>>>>>>
>>>>>>>> Twitter: @*amolhkekre*
>>>>>>
>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>
>>>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April
4, 2017!*
>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
>>>>>>>>>
>>>>>>>> pramod@datatorrent.com
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Can't the commitedWindowId be calculated by looking at
the
>>>>>>>>>>
>>>>>>>>> physical
>>>>>
>>>>>> plan
>>>>>>>>
>>>>>>>>> and the existing checkpoints?
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
>>>>>>>>>>
>>>>>>>>> tushar@apache.org
>>>>
>>>>> wrote:
>>>>>>>>
>>>>>>>>> Help Needed for APEXCORE-619
>>>>>>>>>>>
>>>>>>>>>>> Issue : When application is relaunched after
long time with
>>>>>>>>>>>
>>>>>>>>>> stateless
>>>>>>>
>>>>>>>> opeartors at the end of the DAG, the stateless operators
>>>>>>>>>>>
>>>>>>>>>> starts
>>>>
>>>>> with
>>>>>>>
>>>>>>>> a
>>>>>>>>
>>>>>>>>> very
>>>>>>>>>>
>>>>>>>>>>> high windowId. In this case the stateless operator
ignors
>>>>>>>>>>>
>>>>>>>>>> all
>>>
>>>> the
>>>>>
>>>>>> data
>>>>>>>>
>>>>>>>>> received till upstream operator catches up with it. This
>>>>>>>>>>>
>>>>>>>>>> breaks
>>>>
>>>>> the
>>>>>>
>>>>>>> *at-least-once* gaurantee while relaunch of the opeartor or
>>>>>>>>>>>
>>>>>>>>>> when
>>>>>
>>>>>> master
>>>>>>>>
>>>>>>>>> is
>>>>>>>>>>
>>>>>>>>>>> killed and application is restarted.
>>>>>>>>>>>
>>>>>>>>>>> Solutions:
>>>>>>>>>>> - Fix windowId for stateless leaf operators from
upstream
>>>>>>>>>>>
>>>>>>>>>> opeartor.
>>>>>>
>>>>>>> But
>>>>>>>>
>>>>>>>>> it
>>>>>>>>>>
>>>>>>>>>>> has some issues when we have a join with two
upstrams
>>>>>>>>>>>
>>>>>>>>>> operators
>>>>
>>>>> at
>>>>>>
>>>>>>> different windowId. If we set the windowID to min(upstream
>>>>>>>>>>>
>>>>>>>>>> windowId),
>>>>>>>
>>>>>>>> then
>>>>>>>>>>
>>>>>>>>>>> we need to again recalulate the new recovery
window ids for
>>>>>>>>>>>
>>>>>>>>>> upstream
>>>>>>>
>>>>>>>> paths
>>>>>>>>>>
>>>>>>>>>>> from this operators.
>>>>>>>>>>>
>>>>>>>>>>> - Other solution is to create a empty file in
checkpoint
>>>>>>>>>>>
>>>>>>>>>> directory
>>>>>>
>>>>>>> for
>>>>>>>>
>>>>>>>>> stateless operators. This will help us to identify the
>>>>>>>>>>>
>>>>>>>>>> checkpoints
>>>>>>
>>>>>>> of
>>>>>>>
>>>>>>>> stateless operators during relaunch instead of computing
>>>>>>>>>>>
>>>>>>>>>> from
>>>
>>>> latest
>>>>>>>
>>>>>>>> timestamp.
>>>>>>>>>>>
>>>>>>>>>>> - Bring the entire DAG to committedWindowId.
This could be
>>>>>>>>>>>
>>>>>>>>>> achived
>>>>>>
>>>>>>> using
>>>>>>>>>
>>>>>>>>>> writing committedWindowId in a journal. we need to
make
>>>>>>>>>>>
>>>>>>>>>> sure
>>>
>>>> that
>>>>>
>>>>>> we
>>>>>>>
>>>>>>>> are
>>>>>>>>>
>>>>>>>>>> not puring the checkpointed state until the
>>>>>>>>>>>
>>>>>>>>>> committedWundowId
>>>
>>>> is
>>>>>
>>>>>> saved
>>>>>>>>
>>>>>>>>> in
>>>>>>>>>
>>>>>>>>>> journal.
>>>>>>>>>>>
>>>>>>>>>>> Let me know your thoughs on this and preferred
solution.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> -Tushar.
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>> *Join us at Apex Big Data World-San Jose
>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>
>>>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message