apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Farkas <...@datatorrent.com>
Subject Re: Why is Async checkpointing made default?
Date Mon, 23 Nov 2015 19:17:43 GMT
Pramod,

Very good point about checkpointed being called within a window boundary.
But I still don't understand the purpose of calling checkpointed after the
asynchronous copy to HDFS is completed. It makes checkpointed
implementations more complex, and it does not provide any benefits. The
checkpointed implementation logic could be called multiple times for the
same window if the operator fails and is restored to an earlier checkpoint,
this fact invalidates any atomicity gaurantees that you are trying to
preserve. I would like a concrete example of when it is necessary to call
checkpointed after the asynchronous copy to hdfs. The only use case I can
think of is if the operator reads its own checkpointed state from hdfs in
the checkpointed call back, but this use case is not a valid use case.

Tim

On Mon, Nov 23, 2015 at 10:56 AM, Pramod Immaneni <pramod@datatorrent.com>
wrote:

> There is a problem I see with the older way in which checkpointed was
> called as well which is relevant to this. It is called outside of window
> boundary between end window and begin window which I don't think is the
> correct as platform should not run operator business logic outside of
> window boundaries. The operator could end up sending tuples accidentally in
> this call for example.
>
> I think we need to fix that and also not assume that checkpointed callback
> will be called immediately after end window of the window being
> checkpointed.
>
> Thanks
>
> On Mon, Nov 23, 2015 at 10:38 AM, Chandni Singh <chandni@datatorrent.com>
> wrote:
>
> > :-) I did not write it that semantics. Looks like someone made that
> > assumption along the way. Fix the code which assumes that.
> >
> > What  do you mean?
> > We go on fixing all the code which was written because the behavior with
> > Synchronous checkpointing was
> > beginWindow(x) -> endWindow -> checkpointWindow(x)   ( when checkpointing
> > is aligned with application window boundary)
> >
> > This was broken recently with async checkpointing but instead of fixing
> > that, we go on fixing all the existing solutions.
> > I guess I find this ridiculous.
> >
> > And why should we make it harder and harder to write operators?
> > I think majority of people who participated in this discussion do believe
> > that we need to fix and I think this is critical.
> >
> > Chandni
> >
> >
> >
> > On Mon, Nov 23, 2015 at 10:16 AM, Chetan Narsude (cnarsude) <
> > cnarsude@cisco.com> wrote:
> >
> > >
> > >
> > > On 11/23/15, 8:44 AM, "Chandni Singh" <chandni@datatorrent.com> wrote:
> > >
> > > >I think in the API, there is windowId in the checkpointed callback for
> > > >cases when checkpointing could happen within application windows.
> > > >
> > > >IMHO backward compatibility is broken. For 3 years since the platform
> > was
> > > >created the semantics of checkpointed were
> > > >beginWindow(x) -> endwindow -> checkpointWindow(x) when checkpointing
> > was
> > > >aligned with application window boundaries.
> > > >
> > >
> > > :-) I did not write it that semantics. Looks like someone made that
> > > assumption along the way. Fix the code which assumes that.
> > >
> > > ‹
> > > Chetan
> > >
> > > >Please not that aligning Checkpointing with Application Window
> boundary
> > is
> > > >the DEFAUL behavior and I think most of us agree that aligning the
> > > >checkpointing with application window boundaries is what we see in
> every
> > > >use case.
> > > >
> > > >Code was written with that assumption by committers of Apex (not even
> by
> > > >people who are new to Apex). It is broken by by a change introduced
> > couple
> > > >of months back (August 2015).
> > > >
> > > >Moreover, what do we achieve by NOT guaranteeing that semantics-
> > delegate
> > > >the complexity to operators to handle it. Even a simple scenario that
> I
> > > >mentioned in my first mail, is very complicated.
> > > >
> > > >I think this is a big issue and we need to fix it.
> > > >
> > > >Chandni
> > > >
> > > >On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <
> ram@datatorrent.com
> > >
> > > >wrote:
> > > >
> > > >> I too find Gaurav's argument cogent: endWindow() does not take a
> > > >> windowId parameter
> > > >> leading to a natural guarantee that it matches the immediately
> > > >> preceding beginWindow().
> > > >>
> > > >> Both committed() and checkpointed() take that parameter which
> suggests
> > > >>it
> > > >> may
> > > >> lag behind the current window. The comments on those methods say
> > nothing
> > > >> that can be be interpreted as a guarantee that the windowId will
> match
> > > >>the
> > > >> window just processed. So, from the viewpoint of "original intent"
> --
> > > >> a phrase that
> > > >> has a long and storied history in jurisprudence
> > > >> (https://en.wikipedia.org/wiki/Original_intent) --
> > > >> it seems that any code that assumed a guarantee when none existed
> must
> > > >> be regarded
> > > >> as erroneous.
> > > >>
> > > >> Having said that, we are all aware that in software it is not at all
> > > >> unusual to preserve behavior
> > > >> in the interests of backward compatibility, regardless of many other
> > > >> reasons for that behavior
> > > >> to be considered objectionable. So, if the cost of fixing all the
> code
> > > >> that makes that
> > > >> assumption is too high, we should seriously consider reverting it.
> In
> > > >> this context, the
> > > >> guarantee that checkpointed() simply means that the operator state
> has
> > > >> been serialized
> > > >> seems adequate.
> > > >>
> > > >> We have a roughly analogous situation with respect to OS write()
> > calls:
> > > >> When the call returns, we _do not_ have an assurance that the data
> has
> > > >> gone to disk.
> > > >> All we know is that the OS has copied the data to its buffers for
> > > >> flushing to disk. If we
> > > >> want to know when the actual write to disk is done, we need to use
a
> > > >> different call -- fsync().
> > > >>
> > > >> Ram
> > > >>
> > > >> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni
> > > >><pramod@datatorrent.com>
> > > >> wrote:
> > > >> > I think the original intent for checkpointed callback was that
it
> > can
> > > >>be
> > > >> > called anytime after checkpoint and not necessarily immediately
> > after
> > > >>the
> > > >> > window prior of checkpoint. As Gaurav mentioned the API bears
it
> > out.
> > > >> > Furthermore the callback has to be called within the lifecycle
> > > >>methods of
> > > >> > the operator so it will be called inside a window so you anyway
> have
> > > >>to
> > > >> > deal with new data anyway before the callback is called. Even
> though
> > > >> > checkpointed is not committed shouldn't it at least guarantee
that
> > the
> > > >> > operator is recoverable to that state in which case it should
be
> > > >>called
> > > >> > after the save to HDFS actually finishes.
> > > >> >
> > > >> > Thanks
> > > >> >
> > > >> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <
> > gaurav@datatorrent.com
> > > >
> > > >> > wrote:
> > > >> >
> > > >> >> IMO, I don¹t think there is any backward incompatibility
wrt
> > > >> Checkpointing
> > > >> >> call back semantics because
> > > >> >>
> > > >> >> 1. The checkpointed call is only made once the operator state
is
> > > >> preserved.
> > > >> >> 2. The window ids being passed to checkpointed are in increasing
> > > >>order.
> > > >> >> 3. The window ids being passed are still the same ids as
were
> > passed
> > > >> >> earlier.
> > > >> >> 4. The sequence is still
> > begingWindow()->endWindow()->checkpointed().
> > > >> >>
> > > >> >> Thanks
> > > >> >> - Gaurav
> > > >> >>
> > > >> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <
> > gaurav@datatorrent.com>
> > > >> >> wrote:
> > > >> >> >
> > > >> >> > If the requirement is that the order is always
> > > >> >> begingWindow()->endWindow()->checkpointed(), why to
pass windowId
> > in
> > > >>the
> > > >> >> checkpointed() call back?
> > > >> >> >
> > > >> >> > Thanks
> > > >> >> > - Gaurav
> > > >> >> >
> > > >> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh
> > > >><chandni@datatorrent.com
> > > >> >> <mailto:chandni@datatorrent.com>> wrote:
> > > >> >> >>
> > > >> >> >> FYI,
> > > >> >> >>
> > > >> >> >> HDHTWriter implementation is dependent on the older
semantics
> > and
> > > >> seems
> > > >> >> to
> > > >> >> >> be broken now.
> > > >> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
> > > >> >> >> In the checkpointed implementation, it copies certain
state
> > > >> (transient)
> > > >> >> and
> > > >> >> >> transfers it to a checkpointedWriteCache with respect
to
> window
> > > >>'x'.
> > > >> >> >>
> > > >> >> >> With Async checkpointing it, the state that is transferred
is
> > much
> > > >> more
> > > >> >> >> recent than window 'x'.
> > > >> >> >>
> > > >> >> >> Chandni
> > > >> >> >>
> > > >> >> >>
> > > >> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh
<
> > > >> >> chandni@datatorrent.com <mailto:chandni@datatorrent.com>>
> > > >> >> >> wrote:
> > > >> >> >>
> > > >> >> >>> Agreed. Thomas's solution fixes the backward
> incompatibility. I
> > > >> think
> > > >> >> we
> > > >> >> >>> really need to fix this.
> > > >> >> >>>
> > > >> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas
<
> > > >> tim@datatorrent.com
> > > >> >> <mailto:tim@datatorrent.com>>
> > > >> >> >>> wrote:
> > > >> >> >>>
> > > >> >> >>>> Gaurav,
> > > >> >> >>>>
> > > >> >> >>>> I think if the state copy fails then STRAM
should roll back
> > the
> > > >> >> operator
> > > >> >> >>>> to
> > > >> >> >>>> a checkpoint that is further back than the
last checkpoint.
> If
> > > >>you
> > > >> are
> > > >> >> >>>> saying that you want to preserve the semantic
that
> > checkpointed
> > > >>is
> > > >> >> only
> > > >> >> >>>> called after a checkpoint is completed,
I would argue that
> > that
> > > >> >> guarantee
> > > >> >> >>>> is already pointless in the current implementation
since it
> is
> > > >> always
> > > >> >> >>>> possible for an operator to be rolled back
to a checkpoint
> > > >>before
> > > >> it's
> > > >> >> >>>> last
> > > >> >> >>>> completed checkpoint. So, it is already
currently possible
> for
> > > >>some
> > > >> >> >>>> database or file operation performed after
a completed
> > > >>checkpoint
> > > >> to
> > > >> >> be
> > > >> >> >>>> redone after a failure. Because of this
I think Thomas's
> > > >>solution
> > > >> >> makes
> > > >> >> >>>> the
> > > >> >> >>>> most sense. Thomas's solution would also
address Chandni's
> > > >>original
> > > >> >> point
> > > >> >> >>>> that the semantics for the checkpointed
call back have been
> > > >> violated.
> > > >> >> >>>> There
> > > >> >> >>>> are operators in our libraries that have
depended on the
> > > >> >> beginWindow(x),
> > > >> >> >>>> endWindow(x), and checkpointed(x) call sequence,
which is
> now
> > > >> broken.
> > > >> >> We
> > > >> >> >>>> should probably fix that.
> > > >> >> >>>>
> > > >> >> >>>> Tim
> > > >> >> >>>>
> > > >> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav
Gupta <
> > > >> >> gaurav@datatorrent.com <mailto:gaurav@datatorrent.com>>
> > > >> >> >>>> wrote:
> > > >> >> >>>>
> > > >> >> >>>>> Thomas,
> > > >> >> >>>>>
> > > >> >> >>>>> This was done to preserve checkpointing
semantics that is
> to
> > > >>tell
> > > >> the
> > > >> >> >>>>> operator that its state is preserved.
Say if database is
> > > >>updated
> > > >> or
> > > >> >> >>>> files
> > > >> >> >>>>> are moved in checkpointed call but the
state copy fails,
> how
> > to
> > > >> >> address
> > > >> >> >>>>> such scenarios?
> > > >> >> >>>>>
> > > >> >> >>>>> Thanks
> > > >> >> >>>>> - Gaurav
> > > >> >> >>>>>
> > > >> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas
Weise <
> > > >> thomas@datatorrent.com
> > > >> >> <mailto:thomas@datatorrent.com>>
> > > >> >> >>>>> wrote:
> > > >> >> >>>>>>
> > > >> >> >>>>>> Alternatively I would ask why the
checkpointed callback
> > needs
> > > >>to
> > > >> >> wait
> > > >> >> >>>>> until
> > > >> >> >>>>>> the data was copied to HDFS instead
upon completion of the
> > > >>state
> > > >> >> >>>>>> serialization.
> > > >> >> >>>>>>
> > > >> >> >>>>>> Thomas
> > > >> >> >>>>>>
> > > >> >> >>>>>>
> > > >> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM,
Chandni Singh <
> > > >> >> >>>> chandni@datatorrent.com <mailto:chandni@datatorrent.com>>
> > > >> >> >>>>>> wrote:
> > > >> >> >>>>>>
> > > >> >> >>>>>>> Gaurav,
> > > >> >> >>>>>>>
> > > >> >> >>>>>>> My question is about why Async
was made the default when
> it
> > > >> changed
> > > >> >> >>>> the
> > > >> >> >>>>>>> semantics of operator callbacks.
Your response doesn't
> > answer
> > > >> that.
> > > >> >> >>>>>>>
> > > >> >> >>>>>>> In a way we broke backward compatibility.
> > > >> >> >>>>>>>
> > > >> >> >>>>>>> Chandni
> > > >> >> >>>>>>>
> > > >> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22
PM, Gaurav Gupta <
> > > >> >> >>>> gaurav@datatorrent.com <mailto:gaurav@datatorrent.com>>
> > > >> >> >>>>>>> wrote:
> > > >> >> >>>>>>>
> > > >> >> >>>>>>>> The idea behind Async checkpointing
is to unblock
> operator
> > > >> while
> > > >> >> the
> > > >> >> >>>>>>> state
> > > >> >> >>>>>>>> is getting transferred to
HDFS.
> > > >> >> >>>>>>>> Just to clarify that this
beginWindow (x) ->
> endWindow(x)
> > ->
> > > >> >> >>>>> checkpointed
> > > >> >> >>>>>>>> (x-1 ) should be an ideal
sequence, but if the HDFS is
> > slow
> > > >>or
> > > >> for
> > > >> >> >>>> some
> > > >> >> >>>>>>>> other reason transferring
the state to HDFS is slow this
> > > >> sequence
> > > >> >> >>>> may
> > > >> >> >>>>> not
> > > >> >> >>>>>>>> hold true.
> > > >> >> >>>>>>>>
> > > >> >> >>>>>>>> Can your use case be addressed
by
> > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78
<
> > > >> >> https://malhar.atlassian.net/browse/APEX-78> <
> > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78
<
> > > >> >> https://malhar.atlassian.net/browse/APEX-78>>?
> > > >> >> >>>>>>>>
> > > >> >> >>>>>>>> Thanks
> > > >> >> >>>>>>>> - Gaurav
> > > >> >> >>>>>>>>
> > > >> >> >>>>>>>>> On Nov 22, 2015, at
3:56 PM, Chandni Singh <
> > > >> >> >>>> chandni@datatorrent.com <mailto:chandni@datatorrent.com>>
> > > >> >> >>>>>>>> wrote:
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> With Async checkpointing
the checkpoint callback in
> > > >> >> CheckpointPoint
> > > >> >> >>>>>>>>> listener is called for
a previous window, that is,
> > > >> >> >>>>>>>>> beginWindow (x) ->
endWindow(x) -> checkpointed (x-1 )
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> This feature was newly
introduced. With synchronous
> > > >> >> checkpointing,
> > > >> >> >>>> the
> > > >> >> >>>>>>>>> behavior was always
> > > >> >> >>>>>>>>> beginWindow(x) ->
endWindow(x) -> checkpointed (x)
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> A lot of operators were
written before asynchronous
> > > >> checkpointing
> > > >> >> >>>> was
> > > >> >> >>>>>>>>> introduced and few of
them can rely on the sequencing
> > > >> guaranteed
> > > >> >> by
> > > >> >> >>>>>>>>> synchronous checkpointing.
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> So why was Async Checkpointed
made default?
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> With how Async checkpoint
is today, the complexity to
> > > >>handle
> > > >> >> >>>> transient
> > > >> >> >>>>>>>>> state in checkpointed
callback falls on every operator.
> > For
> > > >> eg,
> > > >> >> >>>> lets
> > > >> >> >>>>>>> say
> > > >> >> >>>>>>>>> earlier I had a transient
map which I cleared every
> time
> > > >>the
> > > >> >> >>>>>>> checkpointed
> > > >> >> >>>>>>>>> was called, with async
checkpointing this simple task
> > will
> > > >>be
> > > >> a
> > > >> >> lot
> > > >> >> >>>>>>> more
> > > >> >> >>>>>>>>> complicated.
> > > >> >> >>>>>>>>>
> > > >> >> >>>>>>>>> I think Async checkpointing
broke the semantics of
> > operator
> > > >> >> >>>> callbacks
> > > >> >> >>>>>>> and
> > > >> >> >>>>>>>>> should NOT be the default.
> > > >> >> >>>>>>>>
> > > >> >> >>>>>>>>
> > > >> >> >>>>>>>
> > > >> >> >>>>>
> > > >> >> >>>>>
> > > >> >> >>>>
> > > >> >> >>>
> > > >> >> >>>
> > > >> >> >
> > > >> >>
> > > >> >>
> > > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message