apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: Enhance batch support - batch demarcation
Date Thu, 03 Nov 2016 06:58:33 GMT
Hi All,

Starting with the implementation, we are planning to take care of a single
batch job first. We will take up the scheduling aspect later.

The first requirement is the following:


A batch job is an Apex application which picks up data from the source, and
processes it. Once the data is completely processed, it should detect the
end of batch and shutdown the application. This will help separate the
scheduling aspect from the Apex batch job.


We have the following options to shut down the application -


   - First option is to throw a shutdown() exception from the input operator


   - End of batch can be detected by the input operator depending on source
      specific details.
      - Once the end of batch is detected, the operator itself can shutdown
      the application by throwing a shutdown signal.
      - Problem with this approach is that even though the batch has ended,
      the downstream (output) operators might have finalization
pending which is
      usually done in calls like committed(). Waiting for the committed call in
      the input operator may also not help since this call may happen for the
      output operator in a different window.
      - Another issue might be with multiple input operators where each may
      send the shutdown signal independently.


   - Second, allow the engine to identify whether the application is a
   batch application and if so, poll for ```isBatchDone()``` on the input
   operator until it is true. Once it returns true, we can wait for the
   committed() call and end the application via the Input operator. We can
   have an interface BatchInput which would be implemented by the Input
   Operator of a batch application.

```
public interface BatchInput extends InputOperator
{
boolean isBatchDone();
}
```

The isBatchDone() method can be implemented by the developer which can
identify when a batch has ended. This could be called by the engine to
identify whether the batch has ended and shut down the application.


Allow the engine to identify whether the application is a batch application
and if so, poll for ```isBatchDone()``` on the input operator until it is
true. Once it returns true, we can wait for the committed() call and end
the application via the Input operator. We can have an interface BatchInput
which would be implemented by the Input Operator of a batch application.


   - Third, use a shared Stats Listener to identify the end of a batch
   using some metric in the input operator and remove operators via a dag
   change request. The dag change request can be enabled by APEXCORE-408 which
   is in progress.


Please suggest.

~ Bhupesh




On Fri, Sep 16, 2016 at 2:10 PM, Bhupesh Chawda <bhupesh@datatorrent.com>
wrote:

> Hi All,
>
> Resuming the discussion.
>
> After some discussion, I have created a document which captures the
> requirements and a high level design for supporting batch applications. The
> document consolidates different threads of discussion and aspects which are
> relevant to batch support.
>
> This is in no way a design document, but just captures the high level
> steps. I have tried to keep it very brief and to the point. I will keep
> refining the document depending on the comments to ultimately convert it to
> a design document.
>
> Here is the link to the document: https://docs.google.com/document/d/
> 1qlyQJP80dOlWZeHwICMFA3D3jGG_T2NLhMfzScbuTwQ/edit?usp=sharing
>
> Please provide your valuable feedback.
>
> ~ Bhupesh
>
> On Tue, Feb 23, 2016 at 7:24 AM, David Yan <david@datatorrent.com> wrote:
>
>> For batch applications without checkpointing or iteration loops, what
>> would
>> be the significance of streaming windows and application windows?
>>
>>
>> On Sun, Feb 14, 2016 at 10:33 PM, Thomas Weise <thomas@datatorrent.com>
>> wrote:
>>
>> > Time to resume this discussion. I think it makes sense to look at the
>> batch
>> > as execution of a DAG from setup to teardown for all its operators, as
>> > suggested by Bhupesh and Sandeep. The DAG comes into existence when the
>> > batch begins and terminates when it is done.
>> >
>> > We have also seen from customers that there is demand for having the
>> > scheduler function built in, when there is no external component already
>> > present. For example, a file or set of files could be identified as
>> > "batch". As the application is idle, there is only a scheduler operator
>> > which polls for files. Once work is ready, that operator would launch
>> the
>> > DAG for processing (within same application, but not connected through
>> > stream). When processing is complete, that DAG terminates and returns
>> the
>> > resources.
>> >
>> > As discussed, there is the need to be able to turn off checkpointing,
>> which
>> > is different from setting a large checkpoint window. No checkpointing
>> means
>> > no incremental recovery and hence no need to keep data in buffers.
>> >
>> > There is also the need to relay begin/end signal through the entire DAG.
>> > This is different from setup/shutdown. It is more like begin/endWindow,
>> but
>> > there is only a single "window" in a batch.
>> >
>> >
>> > On Mon, Dec 28, 2015 at 10:36 PM, Chinmay Kolhatkar <
>> > chinmay@datatorrent.com
>> > > wrote:
>> >
>> > > Hi Thomas,
>> > >
>> > > A comment on following in your previous mails:
>> > >
>> > >
>> > >
>> > > *An operator that identifies the batch boundary tells theengine about
>> it
>> > > and corresponding control tuples are submitted through thestream,
>> leading
>> > > to callbacks on downstream operators*
>> > >
>> > > This would mean there will be a single boundary definition of a batch
>> in
>> > > the application DAG.
>> > > I think we should give freedom to individual operator to define what a
>> > > batch is and produce a callbacks accordingly.
>> > >
>> > > Considering that in mind, here is a quick sketch/suggestion of how it
>> can
>> > > be done:
>> > >
>> > > 1) The operator that needs to work on a batch can implement an
>> interface,
>> > > lets say BatchListener.
>> > >
>> > > 2) This will have 4 methods:
>> > > *    startBatch*
>> > > *    endBatch*
>> > > *    configureBatch*
>> > > *    callAtApplicationWindowBoundary *(maybe some better name??)
>> > >
>> > > 3) *configureBatch* will tell what tell what is the boundary of a
>> batch.
>> > > This will be called right after setup OR activate, basically before
>> > > beginning of the stream. The return value will be set with operator
>> > thread.
>> > >
>> > > 4) Based on configuration, the *startBatch* and *endBatch* will be
>> > called.
>> > >
>> > > 5) the *callAtApplicationWindowBoundary* should return *true/false*
>> based
>> > > on whether start/end batch calls should happen at application window
>> > > boundary OR not. Here is where user can choose to take care of
>> > > checkpointing of tuples within a windows by platform OR whether user
>> > wants
>> > > to do that of his own.
>> > >
>> > >
>> > > Thoughts?
>> > >
>> > >
>> > > -Chinmay.
>> > >
>> > >
>> > > ~ Chinmay.
>> > >
>> > > On Tue, Dec 29, 2015 at 11:35 AM, Thomas Weise <
>> thomas@datatorrent.com>
>> > > wrote:
>> > >
>> > > > On Mon, Dec 28, 2015 at 7:01 AM, Sandeep Deshmukh <
>> > > sandeep@datatorrent.com
>> > > > >
>> > > > wrote:
>> > > >
>> > > > > +1 for batch support in Apex. I would be interested to be part
of
>> > this
>> > > > > work.
>> > > > >
>> > > > > I would like to start with basics and would like to know how
one
>> will
>> > > > > define "batch" in Apex context. Which of the following cases
>> would be
>> > > > > supported under batch:
>> > > > >
>> > > > >    1. A program completes a task and auto shutdown itself once
the
>> > task
>> > > > is
>> > > > >    complete. E.g.  the program needs to copy a set of files from
>> > source
>> > > > to
>> > > > >    destination.
>> > > > >    2. A program completes a task and then waits for pre-defined
>> time
>> > to
>> > > > >    poll for something more to work on. E.g. the program copies
all
>> > the
>> > > > > files
>> > > > >    from source location and then periodically checks, say every
1
>> > hour,
>> > > > if
>> > > > >    there are new files at the source and copies them.
>> > > > >    3. A program completes a task and then polls every 1 hr as
in
>> > case 2
>> > > > but
>> > > > >    releases resources during wait time.
>> > > > >
>> > > > >
>> > > >
>> > > > Yes, both, 1. and 2. are valid use cases. I would not make a further
>> > > > distinction between 2. and 3. at this point.
>> > > >
>> > > > Ability to run an application that expands and shrinks as self
>> > contained
>> > > > unit can be a benefit, as otherwise you need an external scheduler
>> just
>> > > to
>> > > > launch jobs (such as Oozie). The associated extra integration work
>> may
>> > be
>> > > > brittle and an unwanted barrier for certain use cases.
>> > > >
>> > > >
>> > > >
>> > > > > Needs for each of the above will vary. I am putting down some
>> basic
>> > > > > requirements for each of them
>> > > > >
>> > > > > 1. This case will need a mechanism to shutdown automatically
on
>> > > > completion
>> > > > > of the task.
>> > > > >
>> > > > > StartProgram()
>> > > > >     StartBatch()
>> > > > >         Streaming Application starts, runs and finishes
>> > > > >     EndBatch()
>> > > > > EndProgram()
>> > > > >
>> > > > > 2. This will simply need a construct to wait for some time (
say
>> 10
>> > > > > minutes) or till some time ( till 1pm) .
>> > > > >
>> > > > > StartProgram()
>> > > > > while(true)
>> > > > > {
>> > > > >     StartBatch()
>> > > > >         Streaming Application starts, runs and finishes
>> > > > >     EndBatch()
>> > > > >     WaitTill(time) or WaitFor(timeperiod)
>> > > > > }
>> > > > > EndProgram()
>> > > > >
>> > > > > 3. Apart from wait construct, we also need release resources
>> support
>> > > > >
>> > > > > StartProgram()
>> > > > > while(true)
>> > > > > {
>> > > > >     RestartFromSavedState() // if any state is saved previously.
>> > > > >     StartBatch()
>> > > > >         Streaming Application starts, runs and finishes
>> > > > >     EndBatch()
>> > > > >     SaveState()
>> > > > >     RelaseResources()
>> > > > >     WaitTill(time) or WaitFor(timeperiod)
>> > > > > }
>> > > > > EndProgram()
>> > > > >
>> > > > >
>> > > > > All the constructs : waitTime(), RestartFromSavedState(),
>> SaveState()
>> > > > > , RelaseResources()
>> > > > > could be very well be part of StartBatch() or EndBatch(). I have
>> put
>> > > them
>> > > > > separately for clear understanding only.
>> > > > >
>> > > > > Another point to think on would be scheduler. A batch job is
>> > generally
>> > > > > triggered as a cron job. Do we still see Apex jobs being
>> triggered by
>> > > > cron
>> > > > > or would like to include a scheduler within Apex that will trigger
>> > jobs
>> > > > > based on time or on some external trigger or even polling for
>> events.
>> > > > >
>> > > > > Regards
>> > > > > Sandeep
>> > > > >
>> > > > > On Mon, Dec 28, 2015 at 5:11 PM, Bhupesh Chawda <
>> > > bhupesh@datatorrent.com
>> > > > >
>> > > > > wrote:
>> > > > >
>> > > > > > +1
>> > > > > >
>> > > > > > I think in the batch case, application windows may be
>> transparent
>> > to
>> > > > the
>> > > > > > user application / operator logic.  A batch can be thought
of as
>> > one
>> > > > > > instantiation of a Apex Dag, from setup() to teardown()
for all
>> > > > > operators.
>> > > > > > May be we need to define a higher level API which encapsulates
a
>> > > > > streaming
>> > > > > > application.
>> > > > > > Something like:
>> > > > > >
>> > > > > > StartBatch()
>> > > > > >   Streaming Application starts, runs and finishes
>> > > > > > EndBatch()
>> > > > > >
>> > > > > > The streaming application will run transparently with all
the
>> > > > windowing /
>> > > > > > checkpointing logic that it currently does. Checkpointing
large
>> > > amounts
>> > > > > of
>> > > > > > data may be avoided by either checkpointing at large intervals
>> or
>> > > even
>> > > > > > disabling checkpointing for the batch job.
>> > > > > > Additionally, the external trigger (existence of some file
etc.
>> )
>> > can
>> > > > be
>> > > > > > controlled by the StartBatch() and EndBatch() calls. In
all the
>> > batch
>> > > > use
>> > > > > > cases, it is usually the case that once the input is processed
>> > > > > completely,
>> > > > > > the batch is done. Example: In map reduce all splits processed
>> > means
>> > > > > batch
>> > > > > > job is done. Similar primitives can be supported by Apex
in
>> order
>> > to
>> > > > > > facilitate the control management in the StartBatch() and
>> > EndBatch()
>> > > > > > methods.
>> > > > > >
>> > > > > > -Bhupesh
>> > > > > >
>> > > > > > On Mon, Dec 28, 2015 at 1:34 PM, Thomas Weise <
>> > > thomas@datatorrent.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Following JIRA is open to enhance the support for batch:
>> > > > > > >
>> > > > > > > https://issues.apache.org/jira/browse/APEXCORE-235
>> > > > > > >
>> > > > > > > One of the challenges with batch on Apex today is that
there
>> > isn't
>> > > > any
>> > > > > > > native support to identify begin/end of batch and associate
>> > actions
>> > > > to
>> > > > > > it.
>> > > > > > > For example, at the beginning we may want to fetch
some data
>> > needed
>> > > > for
>> > > > > > all
>> > > > > > > subsequent processing and at the end perform some finalization
>> > > action
>> > > > > or
>> > > > > > > push to external system (add partition to Hive table
or
>> similar).
>> > > > > > >
>> > > > > > > Absent native support, the workaround is to add a bunch
of
>> ports
>> > > and
>> > > > > > extra
>> > > > > > > operators for propagation and synchronization purposes,
which
>> > makes
>> > > > > > > building the batch application with standard operators
or
>> > > development
>> > > > > of
>> > > > > > > custom operators rather difficult and inefficient.
>> > > > > > >
>> > > > > > > The span of a batch can also be seen as a user defined
window,
>> > with
>> > > > > logic
>> > > > > > > for begin and end. The current "application window"
support is
>> > > > limited
>> > > > > > to a
>> > > > > > > multiple of streaming window on a per operator basis.
In the
>> > batch
>> > > > > case,
>> > > > > > > the boundary needs to be more flexible - user code
needs to be
>> > able
>> > > > to
>> > > > > > > determine begin/endWindow based on external data (existence
of
>> > > files
>> > > > > > etc.).
>> > > > > > >
>> > > > > > > There is another commonality with application window,
and
>> that's
>> > > > > > alignment
>> > > > > > > of checkpointing. For batches where it is more efficient
to
>> redo
>> > > the
>> > > > > > > processing instead of checkpointing potentially large
amounts
>> of
>> > > > > > > intermediate state for incremental recovery, it would
be nice
>> to
>> > be
>> > > > > able
>> > > > > > to
>> > > > > > > say "user window == checkpoint interval".
>> > > > > > >
>> > > > > > > This is to float the idea of having a window control
that can
>> be
>> > > > > > influenced
>> > > > > > > by user code. An operator that identifies the batch
boundary
>> > tells
>> > > > the
>> > > > > > > engine about it and corresponding control tuples are
submitted
>> > > > through
>> > > > > > the
>> > > > > > > stream, leading to callbacks on downstream operators.
These
>> > control
>> > > > > > > tuples should
>> > > > > > > be able to carry contextual information that can be
used in
>> > > > downstream
>> > > > > > > operator logic (file names, schema information etc.)
>> > > > > > >
>> > > > > > > I don't expect the current beginWindow/endWindow can
be
>> augmented
>> > > in
>> > > > a
>> > > > > > > backward compatible way to accommodate this, but a
similar
>> > optional
>> > > > > > > interface could be supported to enable batch aware
operators
>> and
>> > > > > > > checkpointing optimization.
>> > > > > > >
>> > > > > > > Thoughts?
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message