apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: Sequencing of operator calls
Date Wed, 11 Nov 2015 07:51:11 GMT
Yes, there are situations where we can do it in endWindow.
However there are also cases where we would like to do the processing only
in the endWindow which is just before the checkpoint. Such cases, I assume
would benefit from https://malhar.atlassian.net/browse/APEX-78.

Thanks.
-Bhupesh
On 11-Nov-2015 1:03 pm, "Thomas Weise" <thomas@datatorrent.com> wrote:

> What you are looking for would require the guarantee for checkpointed to be
> called before next beginWindow.
>
> Why don't you record the list in endWindow? Is it a performance concern?
>
> This would then benefit from https://malhar.atlassian.net/browse/APEX-78
>
>
>
> On Tue, Nov 10, 2015 at 11:20 PM, Bhupesh Chawda <bhupesh@datatorrent.com>
> wrote:
>
> > I was trying to do some processing between two checkpoint windows.
> >
> > So, for example, an operator deletes some files from hdfs as part of
> normal
> > processing within an application window. However, since hdfs deletes
> cannot
> > be rolled back by the platform, I am trying NOT to delete the files from
> > hdfs but just record them in a list. Now when a checkpoint happens, I
> need
> > to take a snapshot of the list *as of the checkpoint* and later delete
> them
> > asynchronously. This process of taking the snapshot is what I need to do
> > between two checkpoint windows. This can be done anytime between the
> > endWindow just before the checkpoint and beginWindow just after the
> > checkpoint.
> >
> > So the question is, can this be done in the checkpointed callback?
> >
> > If the callback is asynchronous, i. e. It may be called after the next
> > window starts, then probably checkpointed is not the right place to do
> it.
> > On 11-Nov-2015 11:20 am, "Thomas Weise" <thomas@datatorrent.com> wrote:
> >
> > > There does not seem to be a problem with the order of the checkpointed
> > > callbacks.
> > >
> > > I would however question why the callback is delayed until write to
> > storage
> > > is complete. From operator perspective, it can be called as soon as
> > > serialization is complete, which is always synchronous.
> > >
> > > The checkpoint cannot be reported to StrAM until the file copy is done,
> > as
> > > currently implemented.
> > >
> > > Bhupesh, what problem were you looking to solve through the
> checkpointed
> > > callback?
> > >
> > >
> > >
> > >
> > > On Tue, Nov 10, 2015 at 3:57 PM, Chetan Narsude <chetan@apache.org>
> > wrote:
> > >
> > > > With async checkpointing the Node.reportStats is reporting it back
> and
> > it
> > > > looks like the code is checking for the top of the queue to see if
> it’s
> > > > done (or not reporting at all). So I do not see a reason why they
> will
> > be
> > > > reported out of order.
> > > >
> > > > Chandni, I read your subsequent responses. Valid point in the last
> > email
> > > > about documentation. It amazes me how much the documentation of our
> API
> > > > can be improved (self guilty probably the most). One comment that I
> > > wanted
> > > > to make even before that is that the semantics of copyToHdfs is that
> it
> > > > either succeeds or throws exception in which case the recovery kicks
> in
> > > > and nothing is reported as checkpointed.
> > > >
> > > > Tim, we need the unit test, man!
> > > >
> > > > —
> > > > Chetan
> > > >
> > > >
> > > > On 11/10/15, 3:05 PM, "Chetan Narsude (cnarsude)" <
> cnarsude@cisco.com>
> > > > wrote:
> > > >
> > > > >There are a lot of things which are different when it comes to async
> > > > >checkpointing. I was evaluating it in the morning and expect that
> > > either I
> > > > >am able to explain or open jira issues. With my partial observation
> is
> > > > >that with Async checkpointing, checkpointed is not issued (chandni,
> > the
> > > > >last statement in the if block is ³return²). I am digging into it
> but
> > > feel
> > > > >free to chime in if someone else is able to find that.
> > > > >
> > > > >Also I realized that my morning email applies as it is to committed
> > but
> > > > >checkpointed has deviated a little bit from that. Will post the
> > revised
> > > > >response soon.
> > > > >
> > > > >‹
> > > > >Chetan
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >On 11/10/15, 2:04 PM, "Chandni Singh" <chandni@datatorrent.com>
> > wrote:
> > > > >
> > > > >>Chetan,
> > > > >>
> > > > >>Looking at the checkpoint(windowId) in Node.java, I don't think
the
> > > steps
> > > > >>you mentioned are followed.
> > > > >>
> > > > >>*if (using AsyncFSStorageAgent)  {*
> > > > >>*  asyncFSStorageAgent.copyToHdfs(...)*
> > > > >>*}*
> > > > >>*operator.checkpointed(windowId);*
> > > > >>
> > > > >>This means even copyToHdfs fails the operator is notified that
the
> > > window
> > > > >>is check-pointed.
> > > > >>
> > > > >>Are we saying that copyToHdfs will never fail with
> > AsyncFSStorageAgent
> > > > >>for
> > > > >>a window since the operator is notified that the window is
> > > checkpointed?
> > > > >>
> > > > >>Chandni
> > > > >>
> > > > >>On Tue, Nov 10, 2015 at 11:33 AM, Timothy Farkas <
> > tim@datatorrent.com>
> > > > >>wrote:
> > > > >>
> > > > >>> Will do
> > > > >>>
> > > > >>> On Tue, Nov 10, 2015 at 11:01 AM, Pramod Immaneni
> > > > >>><pramod@datatorrent.com>
> > > > >>> wrote:
> > > > >>>
> > > > >>> > Is there a unit test covering it? Otherwise can you
write one
> to
> > > test
> > > > >>>the
> > > > >>> > hypothesis.
> > > > >>> >
> > > > >>> > On Tue, Nov 10, 2015 at 11:00 AM, Timothy Farkas
> > > > >>><tim@datatorrent.com>
> > > > >>> > wrote:
> > > > >>> >
> > > > >>> > > That is what it is looking like to me. The task
is submitted
> > > > >>> > > GenericNode#checkpoint line 504, then at the end
of the
> > > > >>> > > GenericNode#checkpoint line 531 checkpointed is
called. I am
> > > likely
> > > > >>> > missing
> > > > >>> > > something, just would like to know what :)
> > > > >>> > >
> > > > >>> > > Tim
> > > > >>> > >
> > > > >>> > > On Tue, Nov 10, 2015 at 10:51 AM, Pramod Immaneni
<
> > > > >>> > pramod@datatorrent.com>
> > > > >>> > > wrote:
> > > > >>> > >
> > > > >>> > > > Tim,
> > > > >>> > > >
> > > > >>> > > > Are you suggesting that checkpointed is called
before the
> > > > >>>checkpoint
> > > > >>> is
> > > > >>> > > > completely persisted in the storage.
> > > > >>> > > >
> > > > >>> > > > Thanks
> > > > >>> > > >
> > > > >>> > > > On Tue, Nov 10, 2015 at 10:49 AM, Timothy
Farkas <
> > > > >>> tim@datatorrent.com>
> > > > >>> > > > wrote:
> > > > >>> > > >
> > > > >>> > > > > Chetan,
> > > > >>> > > > >
> > > > >>> > > > > I do not see the process of reporting
the checkpoint to
> > > stram,
> > > > >>> > > receiving
> > > > >>> > > > > the ack, and then calling checkpointed.
The logic I'm
> > seeing
> > > in
> > > > >>> > > > GenericNode
> > > > >>> > > > > line 484 is that the checkpoint method
is called, it
> spawns
> > > > >>>another
> > > > >>> > > > thread
> > > > >>> > > > > that writes to hdfs, and then checkpointed
is immediately
> > > > >>>called
> > > > >>> > > > > afterwards. I am missing something, can
you give me some
> > > > >>>pointers
> > > > >>> so
> > > > >>> > > > that I
> > > > >>> > > > > can better understand the flow?
> > > > >>> > > > >
> > > > >>> > > > > Tim
> > > > >>> > > > >
> > > > >>> > > > > On Tue, Nov 10, 2015 at 10:33 AM, Munagala
Ramanath <
> > > > >>> > > ram@datatorrent.com
> > > > >>> > > > >
> > > > >>> > > > > wrote:
> > > > >>> > > > >
> > > > >>> > > > > > Chetan's answer provides a good
explanation as well as
> > > > >>>clarifying
> > > > >>> > > that
> > > > >>> > > > > > the difference can be more than
1.
> > > > >>> > > > > >
> > > > >>> > > > > > Since checkpointing (i.e. "commit
notification" as
> Thomas
> > > > >>>refers
> > > > >>> to
> > > > >>> > > > > > it) is asynchronous, I'm curious
> > > > >>> > > > > > about whether the window ids in
the checkpointed call
> are
> > > > >>> > guaranteed
> > > > >>> > > > > > to be sequential or if they could
> > > > >>> > > > > > be out of order, i.e. can the checkpointed
call see
> > window
> > > id
> > > > >>>101
> > > > >>> > > > > > before it sees 100 ?
> > > > >>> > > > > >
> > > > >>> > > > > > Ram
> > > > >>> > > > > >
> > > > >>> > > > > > On Tue, Nov 10, 2015 at 10:27 AM,
Bhupesh Chawda
> > > > >>> > > > > > <bhupesh@datatorrent.com>
wrote:
> > > > >>> > > > > > > Hi Tim,
> > > > >>> > > > > > > Thanks for the detailed explanation.
> > > > >>> > > > > > > I understand that the sequence
would be
> > > > >>> > > > > > > beginWindow  (x) -> endWindow
(x) -> checkpointed (x)
> > ->
> > > > >>> > > beginWindow
> > > > >>> > > > > > > (x+1)
> > > > >>> > > > > > >
> > > > >>> > > > > > > However when I try to print
out the window ids in
> > > > >>>beginWindow,
> > > > >>> > > > > endWindow
> > > > >>> > > > > > > and checkpointed calls,  I
see x and x-1
> respectively.
> > > > >>> > > > > > > I.e. If the window just before
checkpoint is 100, I
> see
> > > > >>>that
> > > > >>> the
> > > > >>> > > > > > > checkpointed call had window
id 99.
> > > > >>> > > > > > >
> > > > >>> > > > > > > Note: This is observed in the
local mode of Apex.
> > > > >>> > > > > > >
> > > > >>> > > > > > > Thanks
> > > > >>> > > > > > > -Bhupesh
> > > > >>> > > > > > > On 10-Nov-2015 11:25 pm, "Timothy
Farkas"
> > > > >>><tim@datatorrent.com
> > > > >>> >
> > > > >>> > > > wrote:
> > > > >>> > > > > > >
> > > > >>> > > > > > >> Hi Bhupesh,
> > > > >>> > > > > > >>
> > > > >>> > > > > > >> The sequencing of checkpoint
called in relation to
> > > > >>>beginWindow
> > > > >>> > and
> > > > >>> > > > > > >> endWindow depends on how
your
> APPLICATION_WINDOW_COUNT
> > > and
> > > > >>> > > > > > >> CHECKPOINT_WINDOW_COUNT
are configured. If the two
> > > > >>> WINDOW_COUNTs
> > > > >>> > > are
> > > > >>> > > > > not
> > > > >>> > > > > > >> configured to be the same
then it is possible that
> > > > >>> checkpointed
> > > > >>> > is
> > > > >>> > > > > > called
> > > > >>> > > > > > >> within an application window.
So the sequence of
> > events
> > > > >>>would
> > > > >>> be
> > > > >>> > > > this:
> > > > >>> > > > > > >>
> > > > >>> > > > > > >> beginWindow -> checkpointed
-> endWindow
> > > > >>> > > > > > >>
> > > > >>> > > > > > >> If the APPLICATION_WINDOW_COUNT
and
> > > > >>>CHECKPOINT_WINDOW_COUNT
> > > > >>> are
> > > > >>> > > the
> > > > >>> > > > > same
> > > > >>> > > > > > >> (default). Then the sequence
of calls would be this:
> > > > >>> > > > > > >>
> > > > >>> > > > > > >> beginWindow  (100) ->
endWindow (100) ->
> checkpointed
> > > > >>>(100)
> > > > >>> ->
> > > > >>> > > > > > beginWindow
> > > > >>> > > > > > >> (101)
> > > > >>> > > > > > >>
> > > > >>> > > > > > >> The numbers in the sequence
represent possible
> > streaming
> > > > >>> window
> > > > >>> > > Ids
> > > > >>> > > > > that
> > > > >>> > > > > > >> each call would be associated
with.
> > > > >>> > > > > > >>
> > > > >>> > > > > > >> The StateMachine which
calls these callbacks for
> > > non-input
> > > > >>> > > operators
> > > > >>> > > > > is
> > > > >>> > > > > > in
> > > > >>> > > > > > >> GenericNode.java.
> > > > >>> > > > > > >>
> > > > >>> > > > > > >> Tim
> > > > >>> > > > > > >>
> > > > >>> > > > > > >> On Tue, Nov 10, 2015 at
3:36 AM, Bhupesh Chawda <
> > > > >>> > > > > > bhupesh@datatorrent.com>
> > > > >>> > > > > > >> wrote:
> > > > >>> > > > > > >>
> > > > >>> > > > > > >> > Hi Chetan / Community,
> > > > >>> > > > > > >> >
> > > > >>> > > > > > >> > Can someone please
elaborate on why the window id
> > > > >>>supplied
> > > > >>> to
> > > > >>> > > > > > >> > CheckpointListener
and the Operator would differ.
> > > > >>> > > > > > >> > I tried looking at
the window ids of
> checkpointed()
> > > and
> > > > >>>the
> > > > >>> > > > > > beginWindow()
> > > > >>> > > > > > >> > calls and they differ
by 1. Don't know why this
> > should
> > > > >>>be
> > > > >>> the
> > > > >>> > > > case.
> > > > >>> > > > > > >> >
> > > > >>> > > > > > >> > Thanks.
> > > > >>> > > > > > >> > -Bhupesh
> > > > >>> > > > > > >> >
> > > > >>> > > > > > >> > On Thu, Sep 17, 2015
at 5:56 AM, Chetan Narsude <
> > > > >>> > > > > > chetan@datatorrent.com>
> > > > >>> > > > > > >> > wrote:
> > > > >>> > > > > > >> >
> > > > >>> > > > > > >> > > Short answer
is yes.
> > > > >>> > > > > > >> > >
> > > > >>> > > > > > >> > > All the control
tuples are scheduled to be
> > delivered
> > > > >>> outside
> > > > >>> > > of
> > > > >>> > > > > the
> > > > >>> > > > > > >> > window.
> > > > >>> > > > > > >> > > As checkpointed
callback is triggered because of
> > > > >>> CHECKPOINT
> > > > >>> > > > > control
> > > > >>> > > > > > >> > tuple,
> > > > >>> > > > > > >> > > it will happen
after endWindow and before the
> next
> > > > >>> > > beginWindow.
> > > > >>> > > > > > >> > >
> > > > >>> > > > > > >> > > The windowId
supplied to CheckpointListener and
> > the
> > > > >>>one
> > > > >>> > > provided
> > > > >>> > > > > to
> > > > >>> > > > > > >> > > Operator need
not match even though the sequence
> > is
> > > > >>> defined.
> > > > >>> > > So
> > > > >>> > > > I
> > > > >>> > > > > am
> > > > >>> > > > > > >> > > curious how you
intend to use this knowledge.
> > > > >>> > > > > > >> > >
> > > > >>> > > > > > >> > > --
> > > > >>> > > > > > >> > > Chetan
> > > > >>> > > > > > >> > >
> > > > >>> > > > > > >> > >
> > > > >>> > > > > > >> > > On Tue, Sep 15,
2015 at 8:31 AM, Thomas Weise <
> > > > >>> > > > > > thomas@datatorrent.com>
> > > > >>> > > > > > >> > > wrote:
> > > > >>> > > > > > >> > >
> > > > >>> > > > > > >> > > > It has not
changed the operator execution
> model.
> > > > >>>State
> > > > >>> > > > > > serialization
> > > > >>> > > > > > >> is
> > > > >>> > > > > > >> > > > still synchronous,
write to HDFS is taken out
> of
> > > the
> > > > >>> > > operator
> > > > >>> > > > > > thread.
> > > > >>> > > > > > >> > > >
> > > > >>> > > > > > >> > > > On Tue,
Sep 15, 2015 at 8:18 AM, Amol Kekre <
> > > > >>> > > > > amol@datatorrent.com
> > > > >>> > > > > > >
> > > > >>> > > > > > >> > > wrote:
> > > > >>> > > > > > >> > > >
> > > > >>> > > > > > >> > > > >
> > > > >>> > > > > > >> > > > > Sent
too soon. Has asynchronous
> checkpointing
> > > > >>>changed
> > > > >>> > > this?
> > > > >>> > > > > > >> > > > >
> > > > >>> > > > > > >> > > > > Amol
> > > > >>> > > > > > >> > > > >
> > > > >>> > > > > > >> > > > > Sent
from my iPhone
> > > > >>> > > > > > >> > > > >
> > > > >>> > > > > > >> > > > > >
On Sep 15, 2015, at 12:38 AM, Bhupesh
> > Chawda <
> > > > >>> > > > > > >> > > bhupesh@datatorrent.com>
> > > > >>> > > > > > >> > > > > wrote:
> > > > >>> > > > > > >> > > > > >
> > > > >>> > > > > > >> > > > > >
Hi All,
> > > > >>> > > > > > >> > > > > >
> > > > >>> > > > > > >> > > > > >
Is it safe to assume that the
> checkpointed()
> > > and
> > > > >>>the
> > > > >>> > > > > > >> beginWindow()
> > > > >>> > > > > > >> > > > calls
> > > > >>> > > > > > >> > > > > >
are sequenced?
> > > > >>> > > > > > >> > > > > >
In other words, are these calls part of
> the
> > > same
> > > > >>> > thread
> > > > >>> > > > and
> > > > >>> > > > > > may
> > > > >>> > > > > > >> not
> > > > >>> > > > > > >> > > run
> > > > >>> > > > > > >> > > > > in
> > > > >>> > > > > > >> > > > > >
parallel?
> > > > >>> > > > > > >> > > > > >
> > > > >>> > > > > > >> > > > > >
Thanks.
> > > > >>> > > > > > >> > > > > >
> > > > >>> > > > > > >> > > > > >
--
> > > > >>> > > > > > >> > > > > >
-Bhupesh
> > > > >>> > > > > > >> > > > >
> > > > >>> > > > > > >> > > >
> > > > >>> > > > > > >> > >
> > > > >>> > > > > > >> >
> > > > >>> > > > > > >>
> > > > >>> > > > > >
> > > > >>> > > > >
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > > >
> > > >
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message