apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chandni Singh <chan...@datatorrent.com>
Subject Re: Sequencing of operator calls
Date Tue, 10 Nov 2015 23:20:26 GMT
Chetan,

I am pasting the code here and highlighting the lines. The last statement
is NOT a 'return' if Processing_Mode == EXACTLY_ONCE


void checkpoint(long windowId)
{
  if (!context.stateless) {
    StorageAgent ba = context.getValue(OperatorContext.STORAGE_AGENT);
    if (ba != null) {
      try {
        checkpointStats = new Stats.CheckpointStats();
        checkpointStats.checkpointStartTime = System.currentTimeMillis();
        ba.save(operator, id, windowId);
        if (ba instanceof AsyncFSStorageAgent) {
          AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent) ba;
          if (!asyncFSStorageAgent.isSyncCheckpoint()) {
            if(PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
              CheckpointHandler checkpointHandler = new CheckpointHandler();
              checkpointHandler.agent = asyncFSStorageAgent;
              checkpointHandler.operatorId = id;
              checkpointHandler.windowId = windowId;
              checkpointHandler.stats = checkpointStats;
              FutureTask<Stats.CheckpointStats> futureTask = new
FutureTask<Stats.CheckpointStats>(checkpointHandler);
              taskQueue.add(new
Pair<FutureTask<Stats.CheckpointStats>, Long>(futureTask, windowId));
              executorService.submit(futureTask);
              checkpoint = null;
              checkpointStats = null;
              return;
            }else{

*              asyncFSStorageAgent.copyToHDFS(id, windowId);*            }
          }
        }
        checkpointStats.checkpointTime = System.currentTimeMillis() -
checkpointStats.checkpointStartTime;
      }
      catch (IOException ie) {
        try {
          logger.warn("Rolling back checkpoint {} for Operator {} due
to the exception {}",
            Codec.getStringWindowId(windowId), operator, ie);
          ba.delete(id, windowId);
        }
        catch (IOException ex) {
          logger.warn("Error while rolling back checkpoint", ex);
        }
        throw new RuntimeException(ie);
      }
    }
  }

  checkpoint = new Checkpoint(windowId, applicationWindowCount,
checkpointWindowCount);
  if (operator instanceof Operator.CheckpointListener) {

*    ((Operator.CheckpointListener) operator).checkpointed(windowId);*  }
}


Chandni


On Tue, Nov 10, 2015 at 3:05 PM, Chetan Narsude (cnarsude) <
cnarsude@cisco.com> wrote:

> There are a lot of things which are different when it comes to async
> checkpointing. I was evaluating it in the morning and expect that either I
> am able to explain or open jira issues. With my partial observation is
> that with Async checkpointing, checkpointed is not issued (chandni, the
> last statement in the if block is ³return²). I am digging into it but feel
> free to chime in if someone else is able to find that.
>
> Also I realized that my morning email applies as it is to committed but
> checkpointed has deviated a little bit from that. Will post the revised
> response soon.
>
> ‹
> Chetan
>
>
>
>
> On 11/10/15, 2:04 PM, "Chandni Singh" <chandni@datatorrent.com> wrote:
>
> >Chetan,
> >
> >Looking at the checkpoint(windowId) in Node.java, I don't think the steps
> >you mentioned are followed.
> >
> >*if (using AsyncFSStorageAgent)  {*
> >*  asyncFSStorageAgent.copyToHdfs(...)*
> >*}*
> >*operator.checkpointed(windowId);*
> >
> >This means even copyToHdfs fails the operator is notified that the window
> >is check-pointed.
> >
> >Are we saying that copyToHdfs will never fail with AsyncFSStorageAgent for
> >a window since the operator is notified that the window is checkpointed?
> >
> >Chandni
> >
> >On Tue, Nov 10, 2015 at 11:33 AM, Timothy Farkas <tim@datatorrent.com>
> >wrote:
> >
> >> Will do
> >>
> >> On Tue, Nov 10, 2015 at 11:01 AM, Pramod Immaneni
> >><pramod@datatorrent.com>
> >> wrote:
> >>
> >> > Is there a unit test covering it? Otherwise can you write one to test
> >>the
> >> > hypothesis.
> >> >
> >> > On Tue, Nov 10, 2015 at 11:00 AM, Timothy Farkas <tim@datatorrent.com
> >
> >> > wrote:
> >> >
> >> > > That is what it is looking like to me. The task is submitted
> >> > > GenericNode#checkpoint line 504, then at the end of the
> >> > > GenericNode#checkpoint line 531 checkpointed is called. I am likely
> >> > missing
> >> > > something, just would like to know what :)
> >> > >
> >> > > Tim
> >> > >
> >> > > On Tue, Nov 10, 2015 at 10:51 AM, Pramod Immaneni <
> >> > pramod@datatorrent.com>
> >> > > wrote:
> >> > >
> >> > > > Tim,
> >> > > >
> >> > > > Are you suggesting that checkpointed is called before the
> >>checkpoint
> >> is
> >> > > > completely persisted in the storage.
> >> > > >
> >> > > > Thanks
> >> > > >
> >> > > > On Tue, Nov 10, 2015 at 10:49 AM, Timothy Farkas <
> >> tim@datatorrent.com>
> >> > > > wrote:
> >> > > >
> >> > > > > Chetan,
> >> > > > >
> >> > > > > I do not see the process of reporting the checkpoint to
stram,
> >> > > receiving
> >> > > > > the ack, and then calling checkpointed. The logic I'm seeing
in
> >> > > > GenericNode
> >> > > > > line 484 is that the checkpoint method is called, it spawns
> >>another
> >> > > > thread
> >> > > > > that writes to hdfs, and then checkpointed is immediately
called
> >> > > > > afterwards. I am missing something, can you give me some
> >>pointers
> >> so
> >> > > > that I
> >> > > > > can better understand the flow?
> >> > > > >
> >> > > > > Tim
> >> > > > >
> >> > > > > On Tue, Nov 10, 2015 at 10:33 AM, Munagala Ramanath <
> >> > > ram@datatorrent.com
> >> > > > >
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Chetan's answer provides a good explanation as well
as
> >>clarifying
> >> > > that
> >> > > > > > the difference can be more than 1.
> >> > > > > >
> >> > > > > > Since checkpointing (i.e. "commit notification" as
Thomas
> >>refers
> >> to
> >> > > > > > it) is asynchronous, I'm curious
> >> > > > > > about whether the window ids in the checkpointed call
are
> >> > guaranteed
> >> > > > > > to be sequential or if they could
> >> > > > > > be out of order, i.e. can the checkpointed call see
window id
> >>101
> >> > > > > > before it sees 100 ?
> >> > > > > >
> >> > > > > > Ram
> >> > > > > >
> >> > > > > > On Tue, Nov 10, 2015 at 10:27 AM, Bhupesh Chawda
> >> > > > > > <bhupesh@datatorrent.com> wrote:
> >> > > > > > > Hi Tim,
> >> > > > > > > Thanks for the detailed explanation.
> >> > > > > > > I understand that the sequence would be
> >> > > > > > > beginWindow  (x) -> endWindow (x) -> checkpointed
(x)  ->
> >> > > beginWindow
> >> > > > > > > (x+1)
> >> > > > > > >
> >> > > > > > > However when I try to print out the window ids
in
> >>beginWindow,
> >> > > > > endWindow
> >> > > > > > > and checkpointed calls,  I see x and x-1 respectively.
> >> > > > > > > I.e. If the window just before checkpoint is 100,
I see that
> >> the
> >> > > > > > > checkpointed call had window id 99.
> >> > > > > > >
> >> > > > > > > Note: This is observed in the local mode of Apex.
> >> > > > > > >
> >> > > > > > > Thanks
> >> > > > > > > -Bhupesh
> >> > > > > > > On 10-Nov-2015 11:25 pm, "Timothy Farkas"
> >><tim@datatorrent.com
> >> >
> >> > > > wrote:
> >> > > > > > >
> >> > > > > > >> Hi Bhupesh,
> >> > > > > > >>
> >> > > > > > >> The sequencing of checkpoint called in relation
to
> >>beginWindow
> >> > and
> >> > > > > > >> endWindow depends on how your APPLICATION_WINDOW_COUNT
and
> >> > > > > > >> CHECKPOINT_WINDOW_COUNT are configured. If
the two
> >> WINDOW_COUNTs
> >> > > are
> >> > > > > not
> >> > > > > > >> configured to be the same then it is possible
that
> >> checkpointed
> >> > is
> >> > > > > > called
> >> > > > > > >> within an application window. So the sequence
of events
> >>would
> >> be
> >> > > > this:
> >> > > > > > >>
> >> > > > > > >> beginWindow -> checkpointed -> endWindow
> >> > > > > > >>
> >> > > > > > >> If the APPLICATION_WINDOW_COUNT and CHECKPOINT_WINDOW_COUNT
> >> are
> >> > > the
> >> > > > > same
> >> > > > > > >> (default). Then the sequence of calls would
be this:
> >> > > > > > >>
> >> > > > > > >> beginWindow  (100) -> endWindow (100) ->
checkpointed (100)
> >> ->
> >> > > > > > beginWindow
> >> > > > > > >> (101)
> >> > > > > > >>
> >> > > > > > >> The numbers in the sequence represent possible
streaming
> >> window
> >> > > Ids
> >> > > > > that
> >> > > > > > >> each call would be associated with.
> >> > > > > > >>
> >> > > > > > >> The StateMachine which calls these callbacks
for non-input
> >> > > operators
> >> > > > > is
> >> > > > > > in
> >> > > > > > >> GenericNode.java.
> >> > > > > > >>
> >> > > > > > >> Tim
> >> > > > > > >>
> >> > > > > > >> On Tue, Nov 10, 2015 at 3:36 AM, Bhupesh Chawda
<
> >> > > > > > bhupesh@datatorrent.com>
> >> > > > > > >> wrote:
> >> > > > > > >>
> >> > > > > > >> > Hi Chetan / Community,
> >> > > > > > >> >
> >> > > > > > >> > Can someone please elaborate on why the
window id
> >>supplied
> >> to
> >> > > > > > >> > CheckpointListener and the Operator would
differ.
> >> > > > > > >> > I tried looking at the window ids of
checkpointed() and
> >>the
> >> > > > > > beginWindow()
> >> > > > > > >> > calls and they differ by 1. Don't know
why this should be
> >> the
> >> > > > case.
> >> > > > > > >> >
> >> > > > > > >> > Thanks.
> >> > > > > > >> > -Bhupesh
> >> > > > > > >> >
> >> > > > > > >> > On Thu, Sep 17, 2015 at 5:56 AM, Chetan
Narsude <
> >> > > > > > chetan@datatorrent.com>
> >> > > > > > >> > wrote:
> >> > > > > > >> >
> >> > > > > > >> > > Short answer is yes.
> >> > > > > > >> > >
> >> > > > > > >> > > All the control tuples are scheduled
to be delivered
> >> outside
> >> > > of
> >> > > > > the
> >> > > > > > >> > window.
> >> > > > > > >> > > As checkpointed callback is triggered
because of
> >> CHECKPOINT
> >> > > > > control
> >> > > > > > >> > tuple,
> >> > > > > > >> > > it will happen after endWindow and
before the next
> >> > > beginWindow.
> >> > > > > > >> > >
> >> > > > > > >> > > The windowId supplied to CheckpointListener
and the one
> >> > > provided
> >> > > > > to
> >> > > > > > >> > > Operator need not match even though
the sequence is
> >> defined.
> >> > > So
> >> > > > I
> >> > > > > am
> >> > > > > > >> > > curious how you intend to use this
knowledge.
> >> > > > > > >> > >
> >> > > > > > >> > > --
> >> > > > > > >> > > Chetan
> >> > > > > > >> > >
> >> > > > > > >> > >
> >> > > > > > >> > > On Tue, Sep 15, 2015 at 8:31 AM,
Thomas Weise <
> >> > > > > > thomas@datatorrent.com>
> >> > > > > > >> > > wrote:
> >> > > > > > >> > >
> >> > > > > > >> > > > It has not changed the operator
execution model.
> >>State
> >> > > > > > serialization
> >> > > > > > >> is
> >> > > > > > >> > > > still synchronous, write to
HDFS is taken out of the
> >> > > operator
> >> > > > > > thread.
> >> > > > > > >> > > >
> >> > > > > > >> > > > On Tue, Sep 15, 2015 at 8:18
AM, Amol Kekre <
> >> > > > > amol@datatorrent.com
> >> > > > > > >
> >> > > > > > >> > > wrote:
> >> > > > > > >> > > >
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > Sent too soon. Has asynchronous
checkpointing
> >>changed
> >> > > this?
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > Amol
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > Sent from my iPhone
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > > On Sep 15, 2015,
at 12:38 AM, Bhupesh Chawda <
> >> > > > > > >> > > bhupesh@datatorrent.com>
> >> > > > > > >> > > > > wrote:
> >> > > > > > >> > > > > >
> >> > > > > > >> > > > > > Hi All,
> >> > > > > > >> > > > > >
> >> > > > > > >> > > > > > Is it safe to assume
that the checkpointed() and
> >>the
> >> > > > > > >> beginWindow()
> >> > > > > > >> > > > calls
> >> > > > > > >> > > > > > are sequenced?
> >> > > > > > >> > > > > > In other words, are
these calls part of the same
> >> > thread
> >> > > > and
> >> > > > > > may
> >> > > > > > >> not
> >> > > > > > >> > > run
> >> > > > > > >> > > > > in
> >> > > > > > >> > > > > > parallel?
> >> > > > > > >> > > > > >
> >> > > > > > >> > > > > > Thanks.
> >> > > > > > >> > > > > >
> >> > > > > > >> > > > > > --
> >> > > > > > >> > > > > > -Bhupesh
> >> > > > > > >> > > > >
> >> > > > > > >> > > >
> >> > > > > > >> > >
> >> > > > > > >> >
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message