apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tushar Gosavi <tus...@datatorrent.com>
Subject Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application
Date Wed, 22 Jun 2016 18:32:39 GMT
I was thinking about avoiding running user code in master, As a crash
in master takes down all containers with it. hence was going for
scheduler as an operator, crash in scheduler won't kill the
application, master can restart the scheduler back and it can start
monitoring the job again and change the DAG when required. But this
will require communication between master and scheduler for monitoring
of operator status/stats.

It is considerably easy to put scheduling functionality in master, as
we have access to operator stats and there is communication channel
already opened between master and operators. And custom scheduler can
be written as shared stat listener, with additional API available to
listener to add/remove/deploy/undeploy etc.. operators.

Regards,
- Tushar.


On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <thomas@datatorrent.com> wrote:
> Right, if it runs in the app master and does not rely on unmanaged external
> processes, then these requirements can be met.
>
> This capability seeks to avoid users having to deal with external
> schedulers or workflows if all they want is to split a DAG that is
> logically one application into multiple stages for resource optimization.
> This is not very different from the need to have elasticity in terms of
> partitions depending to the availability of input, as you point out.
>
>
> On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
> Chandni.Singh@capitalone.com> wrote:
>
>> Scheduling IMO belongs to App master. Operators can influence it, for eg.
>> File splitter can indicate that no more file to process.
>>
>> I don’t understand how that can not integrate with all the aspects-
>> operability, fault tolerance and security.
>>
>> Chandni
>>
>> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <chinmay@datatorrent.com> wrote:
>>
>> >I think its a good idea to have a scheduling operator when you need to
>> >start a part of the DAG when some trigger happens (for eg. FileSplitter
>> >identifying new files in FS) and otherwise bring it down to save
>> >resources.
>> >
>> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>> >timothytiborfarkas@gmail.com> wrote:
>> >
>> >> I am in agreement with Chandni. Scheduling a batch job is an API
>> >>completely
>> >> independent of a DAG or an operator. It could be used by a commandline
>> >>tool
>> >> running on your laptop, a script, or it could happen to be used by an
>> >> Operator running in a DAG and a StatsListener.
>> >>
>> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <thomas@datatorrent.com>
>> >> wrote:
>> >>
>> >> > Scheduling can be independent, although we have use cases where the
>> >> > scheduling depends on completion of processing (multi-staged batch
>> >>jobs
>> >> > where unused resources need to be freed).
>> >> >
>> >> > Both can be accomplished with a stats listener.
>> >> >
>> >> > There can be a "scheduling operator" that brings up and removes DAG
>> >> > fragments as needed.
>> >> >
>> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>> >><singh.chandni@gmail.com>
>> >> > wrote:
>> >> >
>> >> > > Hi,
>> >> > > IMO scheduling a job can be independent of any operator while
>> >> > > StatsListeners are not.  I understand that in a lot of cases
>> >> input/output
>> >> > > operators will decide when the job ends but there can be cases
when
>> >> > > scheduling can be independent of it.
>> >> > >
>> >> > > Thanks,
>> >> > > Chandni
>> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <thomas@datatorrent.com>
>> >> wrote:
>> >> > >
>> >> > > > This looks like something that coordination wise belongs
into the
>> >> > master
>> >> > > > and can be done with a shared stats listener.
>> >> > > >
>> >> > > > The operator request/response protocol could be used the
relay the
>> >> data
>> >> > > for
>> >> > > > the scheduling decisions.
>> >> > > >
>> >> > > > Thomas
>> >> > > >
>> >> > > >
>> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
>> >> > > > Chandni.Singh@capitalone.com> wrote:
>> >> > > >
>> >> > > > > Hi Tushar,
>> >> > > > >
>> >> > > > > I have some questions about the use case 2: Batch Support
>> >> > > > > I don¹t understand the advantages of providing batch
support by
>> >> > having
>> >> > > an
>> >> > > > > operator as a scheduler.
>> >> > > > >
>> >> > > > > An approach that seemed a little more straightforward
to me was
>> >>to
>> >> > > expose
>> >> > > > > an API for scheduler. If there is a scheduler set then
the
>> >>master
>> >> > uses
>> >> > > > and
>> >> > > > > schedules operators. By default there isn¹t any scheduler
and
>> >>the
>> >> job
>> >> > > is
>> >> > > > > run as it is now.
>> >> > > > >
>> >> > > > > Maybe this is too simplistic but can you please let
me know why
>> >> > having
>> >> > > an
>> >> > > > > operator as a scheduler is a better way?
>> >> > > > >
>> >> > > > > Thanks,
>> >> > > > > Chandni
>> >> > > > >
>> >> > > > >
>> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tushar@datatorrent.com>
>> >> > wrote:
>> >> > > > >
>> >> > > > > >Hi All,
>> >> > > > > >
>> >> > > > > >We have seen few use cases in field which require
Apex
>> >>application
>> >> > > > > >scheduling based on some condition. This has also
came up as
>> >>part
>> >> of
>> >> > > > > >Batch Support in Apex previously
>> >> > > > > >(
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >>
>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>> >> 40mail.gmail.com
>> >> > > %3E)
>> >> > > > > >. I am proposing following functionality in Apex
to help
>> >> scheduling
>> >> > > > > >and better resource utilization for batch jobs.
Please provide
>> >> your
>> >> > > > > >comments.
>> >> > > > > >
>> >> > > > > >Usecase 1 - Dynamic Dag modification.
>> >> > > > > >
>> >> > > > > >Each operator in DAG consumes yarn resources, sometimes
it is
>> >> > > > > >desirable to return the resources to yarn when no
data is
>> >> available
>> >> > > > > >for processing, and deploy whole DAG once data starts
to
>> >>appear.
>> >> For
>> >> > > > > >this to happen automatically, we will need some
data monitoring
>> >> > > > > >operators running in the DAG to trigger restart
and shutdown of
>> >> the
>> >> > > > > >operators in the DAG.
>> >> > > > > >
>> >> > > > > >Apex already have such api to dynamically change
the running
>> >>dag
>> >> > > > > >through cli. We could provide similar API available
to
>> >>operators
>> >> > which
>> >> > > > > >will trigger dag modification at runtime. This information
can
>> >>be
>> >> > > > > >passed to master using heartbeat RPC and master
will make
>> >> > > > > >required changed to the DAG. let me know what do
you think
>> >>about
>> >> > it..
>> >> > > > > >something like below.
>> >> > > > > >Context.beginDagChange();
>> >> > > > > >context.addOperator("o1") <== launch operator
from previous
>> >> > > > check-pointed
>> >> > > > > >state.
>> >> > > > > >context.addOperator("o2", new Operator2()) <==
create new
>> >>operator
>> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
>> >> > > > > >context.shutdown("o3"); <== delete this and downstream
>> >>operators
>> >> > from
>> >> > > > the
>> >> > > > > >DAG.
>> >> > > > > >context.apply();  <== dag changes will be send
to master, and
>> >> master
>> >> > > > > >will apply these changes.
>> >> > > > > >
>> >> > > > > >Similarly API for other functionalities such as
locality
>> >>settings
>> >> > > > > >needs to be provided.
>> >> > > > > >
>> >> > > > > >
>> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>> >> > > > > >
>> >> > > > > >Provide an API callable from operator to launch
a DAG. The
>> >> operator
>> >> > > > > >will prepare an dag object and submit it to the
yarn, the DAG
>> >>will
>> >> > be
>> >> > > > > >scheduled as a new application. This way complex
schedulers
>> >>can be
>> >> > > > > >written as operators.
>> >> > > > > >
>> >> > > > > >public SchedulerOperator implements Operator {
>> >> > > > > >   void handleIdleTime() {
>> >> > > > > >      // check of conditions to start a job (for
example enough
>> >> > files
>> >> > > > > >available, enough items are available in kafa, or
time has
>> >>reached
>> >> > > > > >     Dag dag = context.createDAG();
>> >> > > > > >     dag.addOperator();
>> >> > > > > >     dag.addOperator();
>> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>> >> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
>> >> > > > > >     DagHandler dagHandler = context.submit(dag,
lOptions);
>> >> > > > > >   }
>> >> > > > > >}
>> >> > > > > >
>> >> > > > > >DagHandler will have methods to monitor the final
state of
>> >> > > > > >application, or to kill the DAG
>> >> > > > > >dagHandler.waitForCompletion() <== wait till
the DAG terminates
>> >> > > > > >dagHandler.status()  <== get the status of application.
>> >> > > > > >dagHandler.kill() <== kill the running application.
>> >> > > > > >dagHandler.shutdown() <== shutdown the application.
>> >> > > > > >
>> >> > > > > >The more complex Scheduler operators could be written
to manage
>> >> the
>> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
>> >> > > > > >
>> >> > > > > >Regards,
>> >> > > > > >-Tushar.
>> >> > > > >
>> >> > > > > ________________________________________________________
>> >> > > > >
>> >> > > > > The information contained in this e-mail is confidential
and/or
>> >> > > > > proprietary to Capital One and/or its affiliates and
may only be
>> >> used
>> >> > > > > solely in performance of work or services for Capital
One. The
>> >> > > > information
>> >> > > > > transmitted herewith is intended only for use by the
individual
>> >>or
>> >> > > entity
>> >> > > > > to which it is addressed. If the reader of this message
is not
>> >>the
>> >> > > > intended
>> >> > > > > recipient, you are hereby notified that any review,
>> >>retransmission,
>> >> > > > > dissemination, distribution, copying or other use of,
or taking
>> >>of
>> >> > any
>> >> > > > > action in reliance upon this information is strictly
>> >>prohibited. If
>> >> > you
>> >> > > > > have received this communication in error, please contact
the
>> >> sender
>> >> > > and
>> >> > > > > delete the material from your computer.
>> >> > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>

Mime
View raw message