apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tushar Gosavi <tus...@datatorrent.com>
Subject Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application
Date Tue, 24 Jan 2017 11:17:06 GMT
Hi All,

I have updated the design document as per review comments on pull
request (https://issues.apache.org/jira/secure/attachment/12849094/dag.docx).
Please provide your suggestion.

- Tushar.


On Fri, Sep 16, 2016 at 6:40 PM, Tushar Gosavi <tushar@datatorrent.com> wrote:
> Hi All,
>
> I have opened a review pull request for dynamic dag modification
> through stats listener (https://github.com/apache/apex-core/pull/393).
> Please review and provide
> comments/suggestions.
>
> It provides following functionality
> - StatsListener can access the opearator name for easily detecting
> which opearator stats are being processed.
> - StatsListener can create a instance of object through which it can
> submit dag modifications to the engine.
> - StatsListener can return dag changes as a response to engine.
> - PlanModifier is modified to take a DAG and apply it on the existing
> running DAG and deploy the changes.
>
> The following functionality is not working yet.
>
> - The new opearator does not start from the correct windowId
> (https://issues.apache.org/jira/browse/APEXCORE-532)
> - Relanched application failed to start when it was killed after
> dynamic dag modification.
> - There is no support for resuming operator from previous state when
> they were removed. This could be achived through
>   readig state through external storage on setup.
> - persist operator support is not present for newly added streams.
>
> The demo application using the feature is available at
> https://github.com/tushargosavi/apex-dynamic-scheduling
>
> There are two variations of WordCount application. The first variation
> detects the presence of
> new files and start a disconnected DAG to process the data.
> (https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/WordCountApp.java)
>
> The second application
> (https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java),
> starts with reader operator, and provides pendingFiles as auto-metric
> to stat listener. On detecting pending files it attaches splitter
> counter and output
> operator to the read operator. Once files are processed the splitter,
> counter and output operators are removed and
> added back again if new data files are added into the directory.
>
> Regards,
> -Tushar.
>
>
> On Mon, Aug 1, 2016 at 6:10 PM, Tushar Gosavi <tushar@datatorrent.com> wrote:
>> Hi All,
>>
>> I was able to prototype an simple word count application, which will
>> start with just a single file reader operator. File reader operator
>> will emit pendingFiles as metric to StatsListener. The statslistener
>> will change DAG once enough files are available. The listener will return
>> plan change to add word splitter, counter and console operator to the
>> reader and complete the DAG for wordcount.
>>
>> After 120 windows of inactivity, the three operators will be removed
>> from DAG again. When new set of files are added these operators are
>> added back again.
>>
>> The high level proposal document:
>> https://docs.google.com/document/d/1gpy7NFxd6te1lTXN1-2r8WHRjg83zEM8aqpPAXivFRQ/edit?usp=sharing
>>
>> The prototype code is at :
>> https://github.com/tushargosavi/apex-dynamic-scheduling/
>> The Application files are
>> https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/FileStatListenerSameDag.java
>> https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java
>>
>> Please provide your feedback.
>>
>> Some challenges yet to resolve are
>> - Restoring operator state from previously removed operator.
>> - Handling cuncurrent modifications to DAG from multiple StatsListener.
>> - Making DAG changes persistent, user should be able to restart the
>> application if application was killed with modified dag.
>>
>> Thanks,
>> -Tushar.
>>
>> On Fri, Jul 8, 2016 at 6:48 PM, Tushar Gosavi <tushar@datatorrent.com> wrote:
>>> Hi All,
>>>
>>> I have dome some initial prototype which allows stat listener to
>>> specify dag changes, and the dag changes are applied asynchronously.
>>>
>>> The changes involved are
>>> - Add DagChangeSet object which is inherited from DAG, supporting
>>> methods to remove
>>>   operator and streams.
>>>
>>> - The stat listener will return this object in Response, and platform
>>> will apply changes specified in response to the DAG.
>>>
>>>
>>> The Apex changes
>>> https://github.com/apache/apex-core/compare/master...tushargosavi:scheduler?expand=1
>>>
>>> The correspondign Demo application, which one operator monitors the
>>> directory for files, and launch the wordcount DAG in
>>> same application master when files are available.
>>> https://github.com/tushargosavi/incubator-apex-malhar/tree/178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/schedular
>>>
>>> Example of stat listerner which monitors a metric and instruct master
>>> to start a dag.
>>>
>>>  /** look for more than 100 files in a directory, before lauching the DAG */
>>> @Override
>>>   public Response processStats(BatchedOperatorStats stats)
>>>   {
>>>     for(Stats.OperatorStats ws: stats.getLastWindowedStats()) {
>>>       // pendingFiles is autometric.
>>>       Integer value = (Integer)ws.metrics.get("pendingFiles");
>>>       LOG.info("stats recevied for {} pendingFiles {}",
>>> stats.getOperatorId(), value);
>>>       if (value != null  && value > 100 && !dagStarted) {
>>>         dagStarted = true;
>>>         Response resp = new Response();
>>>         resp.dag = getWordCountDag((String)ws.metrics.get("directory"));
>>>         counter = 0;
>>>         return resp;
>>>       }
>>>     }
>>>     return null;
>>>   }
>>>
>>>   DAGChangeSet getWordCountDag(String dir)
>>>     {
>>>       DAGChangeSet dag = new DAGChangeSet();
>>>       LineByLineFileInputOperator reader = dag.addOperator("Reader",
>>> new LineByLineFileInputOperator());
>>>       List<StatsListener> listeners = new ArrayList<>();
>>>       listeners.add(this);
>>>       dag.getMeta(reader).getAttributes().put(Context.OperatorContext.STATS_LISTENERS,
>>> listeners);
>>>       reader.setDirectory(dir);
>>>       LineSplitter splitter = dag.addOperator("SplitteR", new LineSplitter());
>>>       UniqueCounter<String> counter = dag.addOperator("Counter", new
>>> UniqueCounter<String>());
>>>       ConsoleOutputOperator out = dag.addOperator("Output", new
>>> ConsoleOutputOperator());
>>>       dag.addStream("s1", reader.output, splitter.input);
>>>       dag.addStream("s2", splitter.words, counter.data);
>>>       dag.addStream("s3", counter.count, out.input);
>>>       return dag;
>>>     }
>>>
>>> Let me know if this type of API is acceptable for launching the DAG.
>>> This is an API to specify DAG changes. The scheduler functionality
>>> will use
>>> this API.
>>>
>>>
>>> Regards,
>>> -Tushar.
>>>
>>> On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <thomas@datatorrent.com>
wrote:
>>>> I like the idea of keeping heavy lifting and custom code out of the master,
>>>> if possible. You find that split in responsibilities even in the case of
>>>> partitioning (Kafka connector for example). The change that requires
>>>> partitioning may be detected as byproduct of the regular processing in the
>>>> container, the information relayed to the master, the action being taken
>>>> there.
>>>>
>>>> We should separate all the different pieces and then decide where they run.
>>>> There is detecting the need for a plan change, then effecting the change
>>>> (which requires full DAG view and absolutely has to/should be in the
>>>> master).
>>>>
>>>> Thomas
>>>>
>>>> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
>>>> Chandni.Singh@capitalone.com> wrote:
>>>>
>>>>> We have couple of components that already run in  master - partitioners,
>>>>> stats listeners,  metrics aggregators.  The problem of crashing the master
>>>>> is not specific to just scheduler, isn't it?
>>>>> ________________________________
>>>>> From: Tushar Gosavi <tushar@datatorrent.com>
>>>>> Sent: Wednesday, June 22, 2016 2:32:39 PM
>>>>> To: dev@apex.apache.org
>>>>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
>>>>> application
>>>>>
>>>>> I was thinking about avoiding running user code in master, As a crash
>>>>> in master takes down all containers with it. hence was going for
>>>>> scheduler as an operator, crash in scheduler won't kill the
>>>>> application, master can restart the scheduler back and it can start
>>>>> monitoring the job again and change the DAG when required. But this
>>>>> will require communication between master and scheduler for monitoring
>>>>> of operator status/stats.
>>>>>
>>>>> It is considerably easy to put scheduling functionality in master, as
>>>>> we have access to operator stats and there is communication channel
>>>>> already opened between master and operators. And custom scheduler can
>>>>> be written as shared stat listener, with additional API available to
>>>>> listener to add/remove/deploy/undeploy etc.. operators.
>>>>>
>>>>> Regards,
>>>>> - Tushar.
>>>>>
>>>>>
>>>>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <thomas@datatorrent.com>
>>>>> wrote:
>>>>> > Right, if it runs in the app master and does not rely on unmanaged
>>>>> external
>>>>> > processes, then these requirements can be met.
>>>>> >
>>>>> > This capability seeks to avoid users having to deal with external
>>>>> > schedulers or workflows if all they want is to split a DAG that
is
>>>>> > logically one application into multiple stages for resource optimization.
>>>>> > This is not very different from the need to have elasticity in terms
of
>>>>> > partitions depending to the availability of input, as you point
out.
>>>>> >
>>>>> >
>>>>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
>>>>> > Chandni.Singh@capitalone.com> wrote:
>>>>> >
>>>>> >> Scheduling IMO belongs to App master. Operators can influence
it, for
>>>>> eg.
>>>>> >> File splitter can indicate that no more file to process.
>>>>> >>
>>>>> >> I don’t understand how that can not integrate with all the
aspects-
>>>>> >> operability, fault tolerance and security.
>>>>> >>
>>>>> >> Chandni
>>>>> >>
>>>>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <chinmay@datatorrent.com>
>>>>> wrote:
>>>>> >>
>>>>> >> >I think its a good idea to have a scheduling operator when
you need to
>>>>> >> >start a part of the DAG when some trigger happens (for eg.
FileSplitter
>>>>> >> >identifying new files in FS) and otherwise bring it down
to save
>>>>> >> >resources.
>>>>> >> >
>>>>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>>>>> >> >timothytiborfarkas@gmail.com> wrote:
>>>>> >> >
>>>>> >> >> I am in agreement with Chandni. Scheduling a batch
job is an API
>>>>> >> >>completely
>>>>> >> >> independent of a DAG or an operator. It could be used
by a
>>>>> commandline
>>>>> >> >>tool
>>>>> >> >> running on your laptop, a script, or it could happen
to be used by an
>>>>> >> >> Operator running in a DAG and a StatsListener.
>>>>> >> >>
>>>>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
>>>>> thomas@datatorrent.com>
>>>>> >> >> wrote:
>>>>> >> >>
>>>>> >> >> > Scheduling can be independent, although we have
use cases where the
>>>>> >> >> > scheduling depends on completion of processing
(multi-staged batch
>>>>> >> >>jobs
>>>>> >> >> > where unused resources need to be freed).
>>>>> >> >> >
>>>>> >> >> > Both can be accomplished with a stats listener.
>>>>> >> >> >
>>>>> >> >> > There can be a "scheduling operator" that brings
up and removes DAG
>>>>> >> >> > fragments as needed.
>>>>> >> >> >
>>>>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>>>>> >> >><singh.chandni@gmail.com>
>>>>> >> >> > wrote:
>>>>> >> >> >
>>>>> >> >> > > Hi,
>>>>> >> >> > > IMO scheduling a job can be independent of
any operator while
>>>>> >> >> > > StatsListeners are not.  I understand that
in a lot of cases
>>>>> >> >> input/output
>>>>> >> >> > > operators will decide when the job ends but
there can be cases
>>>>> when
>>>>> >> >> > > scheduling can be independent of it.
>>>>> >> >> > >
>>>>> >> >> > > Thanks,
>>>>> >> >> > > Chandni
>>>>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise"
<thomas@datatorrent.com
>>>>> >
>>>>> >> >> wrote:
>>>>> >> >> > >
>>>>> >> >> > > > This looks like something that coordination
wise belongs into
>>>>> the
>>>>> >> >> > master
>>>>> >> >> > > > and can be done with a shared stats
listener.
>>>>> >> >> > > >
>>>>> >> >> > > > The operator request/response protocol
could be used the relay
>>>>> the
>>>>> >> >> data
>>>>> >> >> > > for
>>>>> >> >> > > > the scheduling decisions.
>>>>> >> >> > > >
>>>>> >> >> > > > Thomas
>>>>> >> >> > > >
>>>>> >> >> > > >
>>>>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh,
Chandni <
>>>>> >> >> > > > Chandni.Singh@capitalone.com> wrote:
>>>>> >> >> > > >
>>>>> >> >> > > > > Hi Tushar,
>>>>> >> >> > > > >
>>>>> >> >> > > > > I have some questions about the
use case 2: Batch Support
>>>>> >> >> > > > > I don¹t understand the advantages
of providing batch support
>>>>> by
>>>>> >> >> > having
>>>>> >> >> > > an
>>>>> >> >> > > > > operator as a scheduler.
>>>>> >> >> > > > >
>>>>> >> >> > > > > An approach that seemed a little
more straightforward to me
>>>>> was
>>>>> >> >>to
>>>>> >> >> > > expose
>>>>> >> >> > > > > an API for scheduler. If there
is a scheduler set then the
>>>>> >> >>master
>>>>> >> >> > uses
>>>>> >> >> > > > and
>>>>> >> >> > > > > schedules operators. By default
there isn¹t any scheduler and
>>>>> >> >>the
>>>>> >> >> job
>>>>> >> >> > > is
>>>>> >> >> > > > > run as it is now.
>>>>> >> >> > > > >
>>>>> >> >> > > > > Maybe this is too simplistic but
can you please let me know
>>>>> why
>>>>> >> >> > having
>>>>> >> >> > > an
>>>>> >> >> > > > > operator as a scheduler is a better
way?
>>>>> >> >> > > > >
>>>>> >> >> > > > > Thanks,
>>>>> >> >> > > > > Chandni
>>>>> >> >> > > > >
>>>>> >> >> > > > >
>>>>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi"
<
>>>>> tushar@datatorrent.com>
>>>>> >> >> > wrote:
>>>>> >> >> > > > >
>>>>> >> >> > > > > >Hi All,
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >We have seen few use cases
in field which require Apex
>>>>> >> >>application
>>>>> >> >> > > > > >scheduling based on some condition.
This has also came up as
>>>>> >> >>part
>>>>> >> >> of
>>>>> >> >> > > > > >Batch Support in Apex previously
>>>>> >> >> > > > > >(
>>>>> >> >> > > > >
>>>>> >> >> > > >
>>>>> >> >> > >
>>>>> >> >> >
>>>>> >> >>
>>>>> >> >>
>>>>> >>
>>>>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>>>>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>>>>> >> >> 40mail.gmail.com
>>>>> >> >> > > %3E)
>>>>> >> >> > > > > >. I am proposing following
functionality in Apex to help
>>>>> >> >> scheduling
>>>>> >> >> > > > > >and better resource utilization
for batch jobs. Please
>>>>> provide
>>>>> >> >> your
>>>>> >> >> > > > > >comments.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Each operator in DAG consumes
yarn resources, sometimes it
>>>>> is
>>>>> >> >> > > > > >desirable to return the resources
to yarn when no data is
>>>>> >> >> available
>>>>> >> >> > > > > >for processing, and deploy
whole DAG once data starts to
>>>>> >> >>appear.
>>>>> >> >> For
>>>>> >> >> > > > > >this to happen automatically,
we will need some data
>>>>> monitoring
>>>>> >> >> > > > > >operators running in the DAG
to trigger restart and
>>>>> shutdown of
>>>>> >> >> the
>>>>> >> >> > > > > >operators in the DAG.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Apex already have such api
to dynamically change the running
>>>>> >> >>dag
>>>>> >> >> > > > > >through cli. We could provide
similar API available to
>>>>> >> >>operators
>>>>> >> >> > which
>>>>> >> >> > > > > >will trigger dag modification
at runtime. This information
>>>>> can
>>>>> >> >>be
>>>>> >> >> > > > > >passed to master using heartbeat
RPC and master will make
>>>>> >> >> > > > > >required changed to the DAG.
let me know what do you think
>>>>> >> >>about
>>>>> >> >> > it..
>>>>> >> >> > > > > >something like below.
>>>>> >> >> > > > > >Context.beginDagChange();
>>>>> >> >> > > > > >context.addOperator("o1") <==
launch operator from previous
>>>>> >> >> > > > check-pointed
>>>>> >> >> > > > > >state.
>>>>> >> >> > > > > >context.addOperator("o2", new
Operator2()) <== create new
>>>>> >> >>operator
>>>>> >> >> > > > > >context.addStream("s1", "reader.output",
"o1.input");
>>>>> >> >> > > > > >context.shutdown("o3"); <==
delete this and downstream
>>>>> >> >>operators
>>>>> >> >> > from
>>>>> >> >> > > > the
>>>>> >> >> > > > > >DAG.
>>>>> >> >> > > > > >context.apply();  <== dag
changes will be send to master,
>>>>> and
>>>>> >> >> master
>>>>> >> >> > > > > >will apply these changes.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Similarly API for other functionalities
such as locality
>>>>> >> >>settings
>>>>> >> >> > > > > >needs to be provided.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Provide an API callable from
operator to launch a DAG. The
>>>>> >> >> operator
>>>>> >> >> > > > > >will prepare an dag object
and submit it to the yarn, the
>>>>> DAG
>>>>> >> >>will
>>>>> >> >> > be
>>>>> >> >> > > > > >scheduled as a new application.
This way complex schedulers
>>>>> >> >>can be
>>>>> >> >> > > > > >written as operators.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >public SchedulerOperator implements
Operator {
>>>>> >> >> > > > > >   void handleIdleTime() {
>>>>> >> >> > > > > >      // check of conditions
to start a job (for example
>>>>> enough
>>>>> >> >> > files
>>>>> >> >> > > > > >available, enough items are
available in kafa, or time has
>>>>> >> >>reached
>>>>> >> >> > > > > >     Dag dag = context.createDAG();
>>>>> >> >> > > > > >     dag.addOperator();
>>>>> >> >> > > > > >     dag.addOperator();
>>>>> >> >> > > > > >     LaunchOptions lOptions
= new LaunchOptions();
>>>>> >> >> > > > > >     lOptions.oldId = ""; //
start for this checkpoint.
>>>>> >> >> > > > > >     DagHandler dagHandler
= context.submit(dag, lOptions);
>>>>> >> >> > > > > >   }
>>>>> >> >> > > > > >}
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >DagHandler will have methods
to monitor the final state of
>>>>> >> >> > > > > >application, or to kill the
DAG
>>>>> >> >> > > > > >dagHandler.waitForCompletion()
<== wait till the DAG
>>>>> terminates
>>>>> >> >> > > > > >dagHandler.status()  <==
get the status of application.
>>>>> >> >> > > > > >dagHandler.kill() <== kill
the running application.
>>>>> >> >> > > > > >dagHandler.shutdown() <==
shutdown the application.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >The more complex Scheduler
operators could be written to
>>>>> manage
>>>>> >> >> the
>>>>> >> >> > > > > >workflows, i.e DAG of DAGs.
using these APIs.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Regards,
>>>>> >> >> > > > > >-Tushar.
>>>>> >> >> > > > >
>>>>> >> >> > > > > ________________________________________________________
>>>>> >> >> > > > >
>>>>> >> >> > > > > The information contained in this
e-mail is confidential
>>>>> and/or
>>>>> >> >> > > > > proprietary to Capital One and/or
its affiliates and may
>>>>> only be
>>>>> >> >> used
>>>>> >> >> > > > > solely in performance of work or
services for Capital One.
>>>>> The
>>>>> >> >> > > > information
>>>>> >> >> > > > > transmitted herewith is intended
only for use by the
>>>>> individual
>>>>> >> >>or
>>>>> >> >> > > entity
>>>>> >> >> > > > > to which it is addressed. If the
reader of this message is
>>>>> not
>>>>> >> >>the
>>>>> >> >> > > > intended
>>>>> >> >> > > > > recipient, you are hereby notified
that any review,
>>>>> >> >>retransmission,
>>>>> >> >> > > > > dissemination, distribution, copying
or other use of, or
>>>>> taking
>>>>> >> >>of
>>>>> >> >> > any
>>>>> >> >> > > > > action in reliance upon this information
is strictly
>>>>> >> >>prohibited. If
>>>>> >> >> > you
>>>>> >> >> > > > > have received this communication
in error, please contact the
>>>>> >> >> sender
>>>>> >> >> > > and
>>>>> >> >> > > > > delete the material from your computer.
>>>>> >> >> > > > >
>>>>> >> >> > > > >
>>>>> >> >> > > >
>>>>> >> >> > >
>>>>> >> >> >
>>>>> >> >>
>>>>> >>
>>>>> >> ________________________________________________________
>>>>> >>
>>>>> >> The information contained in this e-mail is confidential and/or
>>>>> >> proprietary to Capital One and/or its affiliates and may only
be used
>>>>> >> solely in performance of work or services for Capital One. The
>>>>> information
>>>>> >> transmitted herewith is intended only for use by the individual
or
>>>>> entity
>>>>> >> to which it is addressed. If the reader of this message is not
the
>>>>> intended
>>>>> >> recipient, you are hereby notified that any review, retransmission,
>>>>> >> dissemination, distribution, copying or other use of, or taking
of any
>>>>> >> action in reliance upon this information is strictly prohibited.
If you
>>>>> >> have received this communication in error, please contact the
sender and
>>>>> >> delete the material from your computer.
>>>>> >>
>>>>> ________________________________________________________
>>>>>
>>>>> The information contained in this e-mail is confidential and/or
>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>> solely in performance of work or services for Capital One. The information
>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>> to which it is addressed. If the reader of this message is not the intended
>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>> have received this communication in error, please contact the sender
and
>>>>> delete the material from your computer.
>>>>>

Mime
View raw message