apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Weise <tho...@datatorrent.com>
Subject Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application
Date Wed, 22 Jun 2016 21:18:16 GMT
I like the idea of keeping heavy lifting and custom code out of the master,
if possible. You find that split in responsibilities even in the case of
partitioning (Kafka connector for example). The change that requires
partitioning may be detected as byproduct of the regular processing in the
container, the information relayed to the master, the action being taken
there.

We should separate all the different pieces and then decide where they run.
There is detecting the need for a plan change, then effecting the change
(which requires full DAG view and absolutely has to/should be in the
master).

Thomas

On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
Chandni.Singh@capitalone.com> wrote:

> We have couple of components that already run in  master - partitioners,
> stats listeners,  metrics aggregators.  The problem of crashing the master
> is not specific to just scheduler, isn't it?
> ________________________________
> From: Tushar Gosavi <tushar@datatorrent.com>
> Sent: Wednesday, June 22, 2016 2:32:39 PM
> To: dev@apex.apache.org
> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
> application
>
> I was thinking about avoiding running user code in master, As a crash
> in master takes down all containers with it. hence was going for
> scheduler as an operator, crash in scheduler won't kill the
> application, master can restart the scheduler back and it can start
> monitoring the job again and change the DAG when required. But this
> will require communication between master and scheduler for monitoring
> of operator status/stats.
>
> It is considerably easy to put scheduling functionality in master, as
> we have access to operator stats and there is communication channel
> already opened between master and operators. And custom scheduler can
> be written as shared stat listener, with additional API available to
> listener to add/remove/deploy/undeploy etc.. operators.
>
> Regards,
> - Tushar.
>
>
> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <thomas@datatorrent.com>
> wrote:
> > Right, if it runs in the app master and does not rely on unmanaged
> external
> > processes, then these requirements can be met.
> >
> > This capability seeks to avoid users having to deal with external
> > schedulers or workflows if all they want is to split a DAG that is
> > logically one application into multiple stages for resource optimization.
> > This is not very different from the need to have elasticity in terms of
> > partitions depending to the availability of input, as you point out.
> >
> >
> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
> > Chandni.Singh@capitalone.com> wrote:
> >
> >> Scheduling IMO belongs to App master. Operators can influence it, for
> eg.
> >> File splitter can indicate that no more file to process.
> >>
> >> I don’t understand how that can not integrate with all the aspects-
> >> operability, fault tolerance and security.
> >>
> >> Chandni
> >>
> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <chinmay@datatorrent.com>
> wrote:
> >>
> >> >I think its a good idea to have a scheduling operator when you need to
> >> >start a part of the DAG when some trigger happens (for eg. FileSplitter
> >> >identifying new files in FS) and otherwise bring it down to save
> >> >resources.
> >> >
> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
> >> >timothytiborfarkas@gmail.com> wrote:
> >> >
> >> >> I am in agreement with Chandni. Scheduling a batch job is an API
> >> >>completely
> >> >> independent of a DAG or an operator. It could be used by a
> commandline
> >> >>tool
> >> >> running on your laptop, a script, or it could happen to be used by
an
> >> >> Operator running in a DAG and a StatsListener.
> >> >>
> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
> thomas@datatorrent.com>
> >> >> wrote:
> >> >>
> >> >> > Scheduling can be independent, although we have use cases where
the
> >> >> > scheduling depends on completion of processing (multi-staged batch
> >> >>jobs
> >> >> > where unused resources need to be freed).
> >> >> >
> >> >> > Both can be accomplished with a stats listener.
> >> >> >
> >> >> > There can be a "scheduling operator" that brings up and removes
DAG
> >> >> > fragments as needed.
> >> >> >
> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
> >> >><singh.chandni@gmail.com>
> >> >> > wrote:
> >> >> >
> >> >> > > Hi,
> >> >> > > IMO scheduling a job can be independent of any operator while
> >> >> > > StatsListeners are not.  I understand that in a lot of cases
> >> >> input/output
> >> >> > > operators will decide when the job ends but there can be
cases
> when
> >> >> > > scheduling can be independent of it.
> >> >> > >
> >> >> > > Thanks,
> >> >> > > Chandni
> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <thomas@datatorrent.com
> >
> >> >> wrote:
> >> >> > >
> >> >> > > > This looks like something that coordination wise belongs
into
> the
> >> >> > master
> >> >> > > > and can be done with a shared stats listener.
> >> >> > > >
> >> >> > > > The operator request/response protocol could be used
the relay
> the
> >> >> data
> >> >> > > for
> >> >> > > > the scheduling decisions.
> >> >> > > >
> >> >> > > > Thomas
> >> >> > > >
> >> >> > > >
> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> >> >> > > > Chandni.Singh@capitalone.com> wrote:
> >> >> > > >
> >> >> > > > > Hi Tushar,
> >> >> > > > >
> >> >> > > > > I have some questions about the use case 2: Batch
Support
> >> >> > > > > I don¹t understand the advantages of providing
batch support
> by
> >> >> > having
> >> >> > > an
> >> >> > > > > operator as a scheduler.
> >> >> > > > >
> >> >> > > > > An approach that seemed a little more straightforward
to me
> was
> >> >>to
> >> >> > > expose
> >> >> > > > > an API for scheduler. If there is a scheduler set
then the
> >> >>master
> >> >> > uses
> >> >> > > > and
> >> >> > > > > schedules operators. By default there isn¹t any
scheduler and
> >> >>the
> >> >> job
> >> >> > > is
> >> >> > > > > run as it is now.
> >> >> > > > >
> >> >> > > > > Maybe this is too simplistic but can you please
let me know
> why
> >> >> > having
> >> >> > > an
> >> >> > > > > operator as a scheduler is a better way?
> >> >> > > > >
> >> >> > > > > Thanks,
> >> >> > > > > Chandni
> >> >> > > > >
> >> >> > > > >
> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
> tushar@datatorrent.com>
> >> >> > wrote:
> >> >> > > > >
> >> >> > > > > >Hi All,
> >> >> > > > > >
> >> >> > > > > >We have seen few use cases in field which require
Apex
> >> >>application
> >> >> > > > > >scheduling based on some condition. This has
also came up as
> >> >>part
> >> >> of
> >> >> > > > > >Batch Support in Apex previously
> >> >> > > > > >(
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >> >>
> >>
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
> >> >> 40mail.gmail.com
> >> >> > > %3E)
> >> >> > > > > >. I am proposing following functionality in
Apex to help
> >> >> scheduling
> >> >> > > > > >and better resource utilization for batch jobs.
Please
> provide
> >> >> your
> >> >> > > > > >comments.
> >> >> > > > > >
> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
> >> >> > > > > >
> >> >> > > > > >Each operator in DAG consumes yarn resources,
sometimes it
> is
> >> >> > > > > >desirable to return the resources to yarn when
no data is
> >> >> available
> >> >> > > > > >for processing, and deploy whole DAG once data
starts to
> >> >>appear.
> >> >> For
> >> >> > > > > >this to happen automatically, we will need
some data
> monitoring
> >> >> > > > > >operators running in the DAG to trigger restart
and
> shutdown of
> >> >> the
> >> >> > > > > >operators in the DAG.
> >> >> > > > > >
> >> >> > > > > >Apex already have such api to dynamically change
the running
> >> >>dag
> >> >> > > > > >through cli. We could provide similar API available
to
> >> >>operators
> >> >> > which
> >> >> > > > > >will trigger dag modification at runtime. This
information
> can
> >> >>be
> >> >> > > > > >passed to master using heartbeat RPC and master
will make
> >> >> > > > > >required changed to the DAG. let me know what
do you think
> >> >>about
> >> >> > it..
> >> >> > > > > >something like below.
> >> >> > > > > >Context.beginDagChange();
> >> >> > > > > >context.addOperator("o1") <== launch operator
from previous
> >> >> > > > check-pointed
> >> >> > > > > >state.
> >> >> > > > > >context.addOperator("o2", new Operator2())
<== create new
> >> >>operator
> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
> >> >> > > > > >context.shutdown("o3"); <== delete this
and downstream
> >> >>operators
> >> >> > from
> >> >> > > > the
> >> >> > > > > >DAG.
> >> >> > > > > >context.apply();  <== dag changes will be
send to master,
> and
> >> >> master
> >> >> > > > > >will apply these changes.
> >> >> > > > > >
> >> >> > > > > >Similarly API for other functionalities such
as locality
> >> >>settings
> >> >> > > > > >needs to be provided.
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
> >> >> > > > > >
> >> >> > > > > >Provide an API callable from operator to launch
a DAG. The
> >> >> operator
> >> >> > > > > >will prepare an dag object and submit it to
the yarn, the
> DAG
> >> >>will
> >> >> > be
> >> >> > > > > >scheduled as a new application. This way complex
schedulers
> >> >>can be
> >> >> > > > > >written as operators.
> >> >> > > > > >
> >> >> > > > > >public SchedulerOperator implements Operator
{
> >> >> > > > > >   void handleIdleTime() {
> >> >> > > > > >      // check of conditions to start a job
(for example
> enough
> >> >> > files
> >> >> > > > > >available, enough items are available in kafa,
or time has
> >> >>reached
> >> >> > > > > >     Dag dag = context.createDAG();
> >> >> > > > > >     dag.addOperator();
> >> >> > > > > >     dag.addOperator();
> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
> >> >> > > > > >     lOptions.oldId = ""; // start for this
checkpoint.
> >> >> > > > > >     DagHandler dagHandler = context.submit(dag,
lOptions);
> >> >> > > > > >   }
> >> >> > > > > >}
> >> >> > > > > >
> >> >> > > > > >DagHandler will have methods to monitor the
final state of
> >> >> > > > > >application, or to kill the DAG
> >> >> > > > > >dagHandler.waitForCompletion() <== wait
till the DAG
> terminates
> >> >> > > > > >dagHandler.status()  <== get the status
of application.
> >> >> > > > > >dagHandler.kill() <== kill the running application.
> >> >> > > > > >dagHandler.shutdown() <== shutdown the application.
> >> >> > > > > >
> >> >> > > > > >The more complex Scheduler operators could
be written to
> manage
> >> >> the
> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
> >> >> > > > > >
> >> >> > > > > >Regards,
> >> >> > > > > >-Tushar.
> >> >> > > > >
> >> >> > > > > ________________________________________________________
> >> >> > > > >
> >> >> > > > > The information contained in this e-mail is confidential
> and/or
> >> >> > > > > proprietary to Capital One and/or its affiliates
and may
> only be
> >> >> used
> >> >> > > > > solely in performance of work or services for Capital
One.
> The
> >> >> > > > information
> >> >> > > > > transmitted herewith is intended only for use by
the
> individual
> >> >>or
> >> >> > > entity
> >> >> > > > > to which it is addressed. If the reader of this
message is
> not
> >> >>the
> >> >> > > > intended
> >> >> > > > > recipient, you are hereby notified that any review,
> >> >>retransmission,
> >> >> > > > > dissemination, distribution, copying or other use
of, or
> taking
> >> >>of
> >> >> > any
> >> >> > > > > action in reliance upon this information is strictly
> >> >>prohibited. If
> >> >> > you
> >> >> > > > > have received this communication in error, please
contact the
> >> >> sender
> >> >> > > and
> >> >> > > > > delete the material from your computer.
> >> >> > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >>
> >> ________________________________________________________
> >>
> >> The information contained in this e-mail is confidential and/or
> >> proprietary to Capital One and/or its affiliates and may only be used
> >> solely in performance of work or services for Capital One. The
> information
> >> transmitted herewith is intended only for use by the individual or
> entity
> >> to which it is addressed. If the reader of this message is not the
> intended
> >> recipient, you are hereby notified that any review, retransmission,
> >> dissemination, distribution, copying or other use of, or taking of any
> >> action in reliance upon this information is strictly prohibited. If you
> >> have received this communication in error, please contact the sender and
> >> delete the material from your computer.
> >>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message