apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: Sequencing of operator calls
Date Wed, 11 Nov 2015 07:20:45 GMT
I was trying to do some processing between two checkpoint windows.

So, for example, an operator deletes some files from hdfs as part of normal
processing within an application window. However, since hdfs deletes cannot
be rolled back by the platform, I am trying NOT to delete the files from
hdfs but just record them in a list. Now when a checkpoint happens, I need
to take a snapshot of the list *as of the checkpoint* and later delete them
asynchronously. This process of taking the snapshot is what I need to do
between two checkpoint windows. This can be done anytime between the
endWindow just before the checkpoint and beginWindow just after the
checkpoint.

So the question is, can this be done in the checkpointed callback?

If the callback is asynchronous, i. e. It may be called after the next
window starts, then probably checkpointed is not the right place to do it.
On 11-Nov-2015 11:20 am, "Thomas Weise" <thomas@datatorrent.com> wrote:

> There does not seem to be a problem with the order of the checkpointed
> callbacks.
>
> I would however question why the callback is delayed until write to storage
> is complete. From operator perspective, it can be called as soon as
> serialization is complete, which is always synchronous.
>
> The checkpoint cannot be reported to StrAM until the file copy is done, as
> currently implemented.
>
> Bhupesh, what problem were you looking to solve through the checkpointed
> callback?
>
>
>
>
> On Tue, Nov 10, 2015 at 3:57 PM, Chetan Narsude <chetan@apache.org> wrote:
>
> > With async checkpointing the Node.reportStats is reporting it back and it
> > looks like the code is checking for the top of the queue to see if it’s
> > done (or not reporting at all). So I do not see a reason why they will be
> > reported out of order.
> >
> > Chandni, I read your subsequent responses. Valid point in the last email
> > about documentation. It amazes me how much the documentation of our API
> > can be improved (self guilty probably the most). One comment that I
> wanted
> > to make even before that is that the semantics of copyToHdfs is that it
> > either succeeds or throws exception in which case the recovery kicks in
> > and nothing is reported as checkpointed.
> >
> > Tim, we need the unit test, man!
> >
> > —
> > Chetan
> >
> >
> > On 11/10/15, 3:05 PM, "Chetan Narsude (cnarsude)" <cnarsude@cisco.com>
> > wrote:
> >
> > >There are a lot of things which are different when it comes to async
> > >checkpointing. I was evaluating it in the morning and expect that
> either I
> > >am able to explain or open jira issues. With my partial observation is
> > >that with Async checkpointing, checkpointed is not issued (chandni, the
> > >last statement in the if block is ³return²). I am digging into it but
> feel
> > >free to chime in if someone else is able to find that.
> > >
> > >Also I realized that my morning email applies as it is to committed but
> > >checkpointed has deviated a little bit from that. Will post the revised
> > >response soon.
> > >
> > >‹
> > >Chetan
> > >
> > >
> > >
> > >
> > >On 11/10/15, 2:04 PM, "Chandni Singh" <chandni@datatorrent.com> wrote:
> > >
> > >>Chetan,
> > >>
> > >>Looking at the checkpoint(windowId) in Node.java, I don't think the
> steps
> > >>you mentioned are followed.
> > >>
> > >>*if (using AsyncFSStorageAgent)  {*
> > >>*  asyncFSStorageAgent.copyToHdfs(...)*
> > >>*}*
> > >>*operator.checkpointed(windowId);*
> > >>
> > >>This means even copyToHdfs fails the operator is notified that the
> window
> > >>is check-pointed.
> > >>
> > >>Are we saying that copyToHdfs will never fail with AsyncFSStorageAgent
> > >>for
> > >>a window since the operator is notified that the window is
> checkpointed?
> > >>
> > >>Chandni
> > >>
> > >>On Tue, Nov 10, 2015 at 11:33 AM, Timothy Farkas <tim@datatorrent.com>
> > >>wrote:
> > >>
> > >>> Will do
> > >>>
> > >>> On Tue, Nov 10, 2015 at 11:01 AM, Pramod Immaneni
> > >>><pramod@datatorrent.com>
> > >>> wrote:
> > >>>
> > >>> > Is there a unit test covering it? Otherwise can you write one
to
> test
> > >>>the
> > >>> > hypothesis.
> > >>> >
> > >>> > On Tue, Nov 10, 2015 at 11:00 AM, Timothy Farkas
> > >>><tim@datatorrent.com>
> > >>> > wrote:
> > >>> >
> > >>> > > That is what it is looking like to me. The task is submitted
> > >>> > > GenericNode#checkpoint line 504, then at the end of the
> > >>> > > GenericNode#checkpoint line 531 checkpointed is called. I
am
> likely
> > >>> > missing
> > >>> > > something, just would like to know what :)
> > >>> > >
> > >>> > > Tim
> > >>> > >
> > >>> > > On Tue, Nov 10, 2015 at 10:51 AM, Pramod Immaneni <
> > >>> > pramod@datatorrent.com>
> > >>> > > wrote:
> > >>> > >
> > >>> > > > Tim,
> > >>> > > >
> > >>> > > > Are you suggesting that checkpointed is called before
the
> > >>>checkpoint
> > >>> is
> > >>> > > > completely persisted in the storage.
> > >>> > > >
> > >>> > > > Thanks
> > >>> > > >
> > >>> > > > On Tue, Nov 10, 2015 at 10:49 AM, Timothy Farkas <
> > >>> tim@datatorrent.com>
> > >>> > > > wrote:
> > >>> > > >
> > >>> > > > > Chetan,
> > >>> > > > >
> > >>> > > > > I do not see the process of reporting the checkpoint
to
> stram,
> > >>> > > receiving
> > >>> > > > > the ack, and then calling checkpointed. The logic
I'm seeing
> in
> > >>> > > > GenericNode
> > >>> > > > > line 484 is that the checkpoint method is called,
it spawns
> > >>>another
> > >>> > > > thread
> > >>> > > > > that writes to hdfs, and then checkpointed is immediately
> > >>>called
> > >>> > > > > afterwards. I am missing something, can you give
me some
> > >>>pointers
> > >>> so
> > >>> > > > that I
> > >>> > > > > can better understand the flow?
> > >>> > > > >
> > >>> > > > > Tim
> > >>> > > > >
> > >>> > > > > On Tue, Nov 10, 2015 at 10:33 AM, Munagala Ramanath
<
> > >>> > > ram@datatorrent.com
> > >>> > > > >
> > >>> > > > > wrote:
> > >>> > > > >
> > >>> > > > > > Chetan's answer provides a good explanation
as well as
> > >>>clarifying
> > >>> > > that
> > >>> > > > > > the difference can be more than 1.
> > >>> > > > > >
> > >>> > > > > > Since checkpointing (i.e. "commit notification"
as Thomas
> > >>>refers
> > >>> to
> > >>> > > > > > it) is asynchronous, I'm curious
> > >>> > > > > > about whether the window ids in the checkpointed
call are
> > >>> > guaranteed
> > >>> > > > > > to be sequential or if they could
> > >>> > > > > > be out of order, i.e. can the checkpointed
call see window
> id
> > >>>101
> > >>> > > > > > before it sees 100 ?
> > >>> > > > > >
> > >>> > > > > > Ram
> > >>> > > > > >
> > >>> > > > > > On Tue, Nov 10, 2015 at 10:27 AM, Bhupesh
Chawda
> > >>> > > > > > <bhupesh@datatorrent.com> wrote:
> > >>> > > > > > > Hi Tim,
> > >>> > > > > > > Thanks for the detailed explanation.
> > >>> > > > > > > I understand that the sequence would
be
> > >>> > > > > > > beginWindow  (x) -> endWindow (x)
-> checkpointed (x)  ->
> > >>> > > beginWindow
> > >>> > > > > > > (x+1)
> > >>> > > > > > >
> > >>> > > > > > > However when I try to print out the window
ids in
> > >>>beginWindow,
> > >>> > > > > endWindow
> > >>> > > > > > > and checkpointed calls,  I see x and
x-1 respectively.
> > >>> > > > > > > I.e. If the window just before checkpoint
is 100, I see
> > >>>that
> > >>> the
> > >>> > > > > > > checkpointed call had window id 99.
> > >>> > > > > > >
> > >>> > > > > > > Note: This is observed in the local mode
of Apex.
> > >>> > > > > > >
> > >>> > > > > > > Thanks
> > >>> > > > > > > -Bhupesh
> > >>> > > > > > > On 10-Nov-2015 11:25 pm, "Timothy Farkas"
> > >>><tim@datatorrent.com
> > >>> >
> > >>> > > > wrote:
> > >>> > > > > > >
> > >>> > > > > > >> Hi Bhupesh,
> > >>> > > > > > >>
> > >>> > > > > > >> The sequencing of checkpoint called
in relation to
> > >>>beginWindow
> > >>> > and
> > >>> > > > > > >> endWindow depends on how your APPLICATION_WINDOW_COUNT
> and
> > >>> > > > > > >> CHECKPOINT_WINDOW_COUNT are configured.
If the two
> > >>> WINDOW_COUNTs
> > >>> > > are
> > >>> > > > > not
> > >>> > > > > > >> configured to be the same then it
is possible that
> > >>> checkpointed
> > >>> > is
> > >>> > > > > > called
> > >>> > > > > > >> within an application window. So
the sequence of events
> > >>>would
> > >>> be
> > >>> > > > this:
> > >>> > > > > > >>
> > >>> > > > > > >> beginWindow -> checkpointed ->
endWindow
> > >>> > > > > > >>
> > >>> > > > > > >> If the APPLICATION_WINDOW_COUNT and
> > >>>CHECKPOINT_WINDOW_COUNT
> > >>> are
> > >>> > > the
> > >>> > > > > same
> > >>> > > > > > >> (default). Then the sequence of calls
would be this:
> > >>> > > > > > >>
> > >>> > > > > > >> beginWindow  (100) -> endWindow
(100) -> checkpointed
> > >>>(100)
> > >>> ->
> > >>> > > > > > beginWindow
> > >>> > > > > > >> (101)
> > >>> > > > > > >>
> > >>> > > > > > >> The numbers in the sequence represent
possible streaming
> > >>> window
> > >>> > > Ids
> > >>> > > > > that
> > >>> > > > > > >> each call would be associated with.
> > >>> > > > > > >>
> > >>> > > > > > >> The StateMachine which calls these
callbacks for
> non-input
> > >>> > > operators
> > >>> > > > > is
> > >>> > > > > > in
> > >>> > > > > > >> GenericNode.java.
> > >>> > > > > > >>
> > >>> > > > > > >> Tim
> > >>> > > > > > >>
> > >>> > > > > > >> On Tue, Nov 10, 2015 at 3:36 AM,
Bhupesh Chawda <
> > >>> > > > > > bhupesh@datatorrent.com>
> > >>> > > > > > >> wrote:
> > >>> > > > > > >>
> > >>> > > > > > >> > Hi Chetan / Community,
> > >>> > > > > > >> >
> > >>> > > > > > >> > Can someone please elaborate
on why the window id
> > >>>supplied
> > >>> to
> > >>> > > > > > >> > CheckpointListener and the Operator
would differ.
> > >>> > > > > > >> > I tried looking at the window
ids of checkpointed()
> and
> > >>>the
> > >>> > > > > > beginWindow()
> > >>> > > > > > >> > calls and they differ by 1.
Don't know why this should
> > >>>be
> > >>> the
> > >>> > > > case.
> > >>> > > > > > >> >
> > >>> > > > > > >> > Thanks.
> > >>> > > > > > >> > -Bhupesh
> > >>> > > > > > >> >
> > >>> > > > > > >> > On Thu, Sep 17, 2015 at 5:56
AM, Chetan Narsude <
> > >>> > > > > > chetan@datatorrent.com>
> > >>> > > > > > >> > wrote:
> > >>> > > > > > >> >
> > >>> > > > > > >> > > Short answer is yes.
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > All the control tuples
are scheduled to be delivered
> > >>> outside
> > >>> > > of
> > >>> > > > > the
> > >>> > > > > > >> > window.
> > >>> > > > > > >> > > As checkpointed callback
is triggered because of
> > >>> CHECKPOINT
> > >>> > > > > control
> > >>> > > > > > >> > tuple,
> > >>> > > > > > >> > > it will happen after endWindow
and before the next
> > >>> > > beginWindow.
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > The windowId supplied to
CheckpointListener and the
> > >>>one
> > >>> > > provided
> > >>> > > > > to
> > >>> > > > > > >> > > Operator need not match
even though the sequence is
> > >>> defined.
> > >>> > > So
> > >>> > > > I
> > >>> > > > > am
> > >>> > > > > > >> > > curious how you intend
to use this knowledge.
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > --
> > >>> > > > > > >> > > Chetan
> > >>> > > > > > >> > >
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > On Tue, Sep 15, 2015 at
8:31 AM, Thomas Weise <
> > >>> > > > > > thomas@datatorrent.com>
> > >>> > > > > > >> > > wrote:
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > > It has not changed
the operator execution model.
> > >>>State
> > >>> > > > > > serialization
> > >>> > > > > > >> is
> > >>> > > > > > >> > > > still synchronous,
write to HDFS is taken out of
> the
> > >>> > > operator
> > >>> > > > > > thread.
> > >>> > > > > > >> > > >
> > >>> > > > > > >> > > > On Tue, Sep 15, 2015
at 8:18 AM, Amol Kekre <
> > >>> > > > > amol@datatorrent.com
> > >>> > > > > > >
> > >>> > > > > > >> > > wrote:
> > >>> > > > > > >> > > >
> > >>> > > > > > >> > > > >
> > >>> > > > > > >> > > > > Sent too soon.
Has asynchronous checkpointing
> > >>>changed
> > >>> > > this?
> > >>> > > > > > >> > > > >
> > >>> > > > > > >> > > > > Amol
> > >>> > > > > > >> > > > >
> > >>> > > > > > >> > > > > Sent from my
iPhone
> > >>> > > > > > >> > > > >
> > >>> > > > > > >> > > > > > On Sep 15,
2015, at 12:38 AM, Bhupesh Chawda <
> > >>> > > > > > >> > > bhupesh@datatorrent.com>
> > >>> > > > > > >> > > > > wrote:
> > >>> > > > > > >> > > > > >
> > >>> > > > > > >> > > > > > Hi All,
> > >>> > > > > > >> > > > > >
> > >>> > > > > > >> > > > > > Is it safe
to assume that the checkpointed()
> and
> > >>>the
> > >>> > > > > > >> beginWindow()
> > >>> > > > > > >> > > > calls
> > >>> > > > > > >> > > > > > are sequenced?
> > >>> > > > > > >> > > > > > In other
words, are these calls part of the
> same
> > >>> > thread
> > >>> > > > and
> > >>> > > > > > may
> > >>> > > > > > >> not
> > >>> > > > > > >> > > run
> > >>> > > > > > >> > > > > in
> > >>> > > > > > >> > > > > > parallel?
> > >>> > > > > > >> > > > > >
> > >>> > > > > > >> > > > > > Thanks.
> > >>> > > > > > >> > > > > >
> > >>> > > > > > >> > > > > > --
> > >>> > > > > > >> > > > > > -Bhupesh
> > >>> > > > > > >> > > > >
> > >>> > > > > > >> > > >
> > >>> > > > > > >> > >
> > >>> > > > > > >> >
> > >>> > > > > > >>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message