apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tushar Gosavi <tus...@datatorrent.com>
Subject Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application
Date Fri, 08 Jul 2016 13:18:39 GMT
Hi All,

I have dome some initial prototype which allows stat listener to
specify dag changes, and the dag changes are applied asynchronously.

The changes involved are
- Add DagChangeSet object which is inherited from DAG, supporting
methods to remove
  operator and streams.

- The stat listener will return this object in Response, and platform
will apply changes specified in response to the DAG.


The Apex changes
https://github.com/apache/apex-core/compare/master...tushargosavi:scheduler?expand=1

The correspondign Demo application, which one operator monitors the
directory for files, and launch the wordcount DAG in
same application master when files are available.
https://github.com/tushargosavi/incubator-apex-malhar/tree/178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/schedular

Example of stat listerner which monitors a metric and instruct master
to start a dag.

 /** look for more than 100 files in a directory, before lauching the DAG */
@Override
  public Response processStats(BatchedOperatorStats stats)
  {
    for(Stats.OperatorStats ws: stats.getLastWindowedStats()) {
      // pendingFiles is autometric.
      Integer value = (Integer)ws.metrics.get("pendingFiles");
      LOG.info("stats recevied for {} pendingFiles {}",
stats.getOperatorId(), value);
      if (value != null  && value > 100 && !dagStarted) {
        dagStarted = true;
        Response resp = new Response();
        resp.dag = getWordCountDag((String)ws.metrics.get("directory"));
        counter = 0;
        return resp;
      }
    }
    return null;
  }

  DAGChangeSet getWordCountDag(String dir)
    {
      DAGChangeSet dag = new DAGChangeSet();
      LineByLineFileInputOperator reader = dag.addOperator("Reader",
new LineByLineFileInputOperator());
      List<StatsListener> listeners = new ArrayList<>();
      listeners.add(this);
      dag.getMeta(reader).getAttributes().put(Context.OperatorContext.STATS_LISTENERS,
listeners);
      reader.setDirectory(dir);
      LineSplitter splitter = dag.addOperator("SplitteR", new LineSplitter());
      UniqueCounter<String> counter = dag.addOperator("Counter", new
UniqueCounter<String>());
      ConsoleOutputOperator out = dag.addOperator("Output", new
ConsoleOutputOperator());
      dag.addStream("s1", reader.output, splitter.input);
      dag.addStream("s2", splitter.words, counter.data);
      dag.addStream("s3", counter.count, out.input);
      return dag;
    }

Let me know if this type of API is acceptable for launching the DAG.
This is an API to specify DAG changes. The scheduler functionality
will use
this API.


Regards,
-Tushar.

On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <thomas@datatorrent.com> wrote:
> I like the idea of keeping heavy lifting and custom code out of the master,
> if possible. You find that split in responsibilities even in the case of
> partitioning (Kafka connector for example). The change that requires
> partitioning may be detected as byproduct of the regular processing in the
> container, the information relayed to the master, the action being taken
> there.
>
> We should separate all the different pieces and then decide where they run.
> There is detecting the need for a plan change, then effecting the change
> (which requires full DAG view and absolutely has to/should be in the
> master).
>
> Thomas
>
> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
> Chandni.Singh@capitalone.com> wrote:
>
>> We have couple of components that already run in  master - partitioners,
>> stats listeners,  metrics aggregators.  The problem of crashing the master
>> is not specific to just scheduler, isn't it?
>> ________________________________
>> From: Tushar Gosavi <tushar@datatorrent.com>
>> Sent: Wednesday, June 22, 2016 2:32:39 PM
>> To: dev@apex.apache.org
>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
>> application
>>
>> I was thinking about avoiding running user code in master, As a crash
>> in master takes down all containers with it. hence was going for
>> scheduler as an operator, crash in scheduler won't kill the
>> application, master can restart the scheduler back and it can start
>> monitoring the job again and change the DAG when required. But this
>> will require communication between master and scheduler for monitoring
>> of operator status/stats.
>>
>> It is considerably easy to put scheduling functionality in master, as
>> we have access to operator stats and there is communication channel
>> already opened between master and operators. And custom scheduler can
>> be written as shared stat listener, with additional API available to
>> listener to add/remove/deploy/undeploy etc.. operators.
>>
>> Regards,
>> - Tushar.
>>
>>
>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <thomas@datatorrent.com>
>> wrote:
>> > Right, if it runs in the app master and does not rely on unmanaged
>> external
>> > processes, then these requirements can be met.
>> >
>> > This capability seeks to avoid users having to deal with external
>> > schedulers or workflows if all they want is to split a DAG that is
>> > logically one application into multiple stages for resource optimization.
>> > This is not very different from the need to have elasticity in terms of
>> > partitions depending to the availability of input, as you point out.
>> >
>> >
>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
>> > Chandni.Singh@capitalone.com> wrote:
>> >
>> >> Scheduling IMO belongs to App master. Operators can influence it, for
>> eg.
>> >> File splitter can indicate that no more file to process.
>> >>
>> >> I don’t understand how that can not integrate with all the aspects-
>> >> operability, fault tolerance and security.
>> >>
>> >> Chandni
>> >>
>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <chinmay@datatorrent.com>
>> wrote:
>> >>
>> >> >I think its a good idea to have a scheduling operator when you need
to
>> >> >start a part of the DAG when some trigger happens (for eg. FileSplitter
>> >> >identifying new files in FS) and otherwise bring it down to save
>> >> >resources.
>> >> >
>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>> >> >timothytiborfarkas@gmail.com> wrote:
>> >> >
>> >> >> I am in agreement with Chandni. Scheduling a batch job is an API
>> >> >>completely
>> >> >> independent of a DAG or an operator. It could be used by a
>> commandline
>> >> >>tool
>> >> >> running on your laptop, a script, or it could happen to be used
by an
>> >> >> Operator running in a DAG and a StatsListener.
>> >> >>
>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
>> thomas@datatorrent.com>
>> >> >> wrote:
>> >> >>
>> >> >> > Scheduling can be independent, although we have use cases
where the
>> >> >> > scheduling depends on completion of processing (multi-staged
batch
>> >> >>jobs
>> >> >> > where unused resources need to be freed).
>> >> >> >
>> >> >> > Both can be accomplished with a stats listener.
>> >> >> >
>> >> >> > There can be a "scheduling operator" that brings up and removes
DAG
>> >> >> > fragments as needed.
>> >> >> >
>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>> >> >><singh.chandni@gmail.com>
>> >> >> > wrote:
>> >> >> >
>> >> >> > > Hi,
>> >> >> > > IMO scheduling a job can be independent of any operator
while
>> >> >> > > StatsListeners are not.  I understand that in a lot of
cases
>> >> >> input/output
>> >> >> > > operators will decide when the job ends but there can
be cases
>> when
>> >> >> > > scheduling can be independent of it.
>> >> >> > >
>> >> >> > > Thanks,
>> >> >> > > Chandni
>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <thomas@datatorrent.com
>> >
>> >> >> wrote:
>> >> >> > >
>> >> >> > > > This looks like something that coordination wise
belongs into
>> the
>> >> >> > master
>> >> >> > > > and can be done with a shared stats listener.
>> >> >> > > >
>> >> >> > > > The operator request/response protocol could be
used the relay
>> the
>> >> >> data
>> >> >> > > for
>> >> >> > > > the scheduling decisions.
>> >> >> > > >
>> >> >> > > > Thomas
>> >> >> > > >
>> >> >> > > >
>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni
<
>> >> >> > > > Chandni.Singh@capitalone.com> wrote:
>> >> >> > > >
>> >> >> > > > > Hi Tushar,
>> >> >> > > > >
>> >> >> > > > > I have some questions about the use case 2:
Batch Support
>> >> >> > > > > I don¹t understand the advantages of providing
batch support
>> by
>> >> >> > having
>> >> >> > > an
>> >> >> > > > > operator as a scheduler.
>> >> >> > > > >
>> >> >> > > > > An approach that seemed a little more straightforward
to me
>> was
>> >> >>to
>> >> >> > > expose
>> >> >> > > > > an API for scheduler. If there is a scheduler
set then the
>> >> >>master
>> >> >> > uses
>> >> >> > > > and
>> >> >> > > > > schedules operators. By default there isn¹t
any scheduler and
>> >> >>the
>> >> >> job
>> >> >> > > is
>> >> >> > > > > run as it is now.
>> >> >> > > > >
>> >> >> > > > > Maybe this is too simplistic but can you please
let me know
>> why
>> >> >> > having
>> >> >> > > an
>> >> >> > > > > operator as a scheduler is a better way?
>> >> >> > > > >
>> >> >> > > > > Thanks,
>> >> >> > > > > Chandni
>> >> >> > > > >
>> >> >> > > > >
>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
>> tushar@datatorrent.com>
>> >> >> > wrote:
>> >> >> > > > >
>> >> >> > > > > >Hi All,
>> >> >> > > > > >
>> >> >> > > > > >We have seen few use cases in field which
require Apex
>> >> >>application
>> >> >> > > > > >scheduling based on some condition. This
has also came up as
>> >> >>part
>> >> >> of
>> >> >> > > > > >Batch Support in Apex previously
>> >> >> > > > > >(
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >> >>
>> >>
>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>> >> >> 40mail.gmail.com
>> >> >> > > %3E)
>> >> >> > > > > >. I am proposing following functionality
in Apex to help
>> >> >> scheduling
>> >> >> > > > > >and better resource utilization for batch
jobs. Please
>> provide
>> >> >> your
>> >> >> > > > > >comments.
>> >> >> > > > > >
>> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
>> >> >> > > > > >
>> >> >> > > > > >Each operator in DAG consumes yarn resources,
sometimes it
>> is
>> >> >> > > > > >desirable to return the resources to yarn
when no data is
>> >> >> available
>> >> >> > > > > >for processing, and deploy whole DAG once
data starts to
>> >> >>appear.
>> >> >> For
>> >> >> > > > > >this to happen automatically, we will need
some data
>> monitoring
>> >> >> > > > > >operators running in the DAG to trigger
restart and
>> shutdown of
>> >> >> the
>> >> >> > > > > >operators in the DAG.
>> >> >> > > > > >
>> >> >> > > > > >Apex already have such api to dynamically
change the running
>> >> >>dag
>> >> >> > > > > >through cli. We could provide similar API
available to
>> >> >>operators
>> >> >> > which
>> >> >> > > > > >will trigger dag modification at runtime.
This information
>> can
>> >> >>be
>> >> >> > > > > >passed to master using heartbeat RPC and
master will make
>> >> >> > > > > >required changed to the DAG. let me know
what do you think
>> >> >>about
>> >> >> > it..
>> >> >> > > > > >something like below.
>> >> >> > > > > >Context.beginDagChange();
>> >> >> > > > > >context.addOperator("o1") <== launch
operator from previous
>> >> >> > > > check-pointed
>> >> >> > > > > >state.
>> >> >> > > > > >context.addOperator("o2", new Operator2())
<== create new
>> >> >>operator
>> >> >> > > > > >context.addStream("s1", "reader.output",
"o1.input");
>> >> >> > > > > >context.shutdown("o3"); <== delete this
and downstream
>> >> >>operators
>> >> >> > from
>> >> >> > > > the
>> >> >> > > > > >DAG.
>> >> >> > > > > >context.apply();  <== dag changes will
be send to master,
>> and
>> >> >> master
>> >> >> > > > > >will apply these changes.
>> >> >> > > > > >
>> >> >> > > > > >Similarly API for other functionalities
such as locality
>> >> >>settings
>> >> >> > > > > >needs to be provided.
>> >> >> > > > > >
>> >> >> > > > > >
>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>> >> >> > > > > >
>> >> >> > > > > >Provide an API callable from operator to
launch a DAG. The
>> >> >> operator
>> >> >> > > > > >will prepare an dag object and submit it
to the yarn, the
>> DAG
>> >> >>will
>> >> >> > be
>> >> >> > > > > >scheduled as a new application. This way
complex schedulers
>> >> >>can be
>> >> >> > > > > >written as operators.
>> >> >> > > > > >
>> >> >> > > > > >public SchedulerOperator implements Operator
{
>> >> >> > > > > >   void handleIdleTime() {
>> >> >> > > > > >      // check of conditions to start a
job (for example
>> enough
>> >> >> > files
>> >> >> > > > > >available, enough items are available in
kafa, or time has
>> >> >>reached
>> >> >> > > > > >     Dag dag = context.createDAG();
>> >> >> > > > > >     dag.addOperator();
>> >> >> > > > > >     dag.addOperator();
>> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>> >> >> > > > > >     lOptions.oldId = ""; // start for
this checkpoint.
>> >> >> > > > > >     DagHandler dagHandler = context.submit(dag,
lOptions);
>> >> >> > > > > >   }
>> >> >> > > > > >}
>> >> >> > > > > >
>> >> >> > > > > >DagHandler will have methods to monitor
the final state of
>> >> >> > > > > >application, or to kill the DAG
>> >> >> > > > > >dagHandler.waitForCompletion() <== wait
till the DAG
>> terminates
>> >> >> > > > > >dagHandler.status()  <== get the status
of application.
>> >> >> > > > > >dagHandler.kill() <== kill the running
application.
>> >> >> > > > > >dagHandler.shutdown() <== shutdown the
application.
>> >> >> > > > > >
>> >> >> > > > > >The more complex Scheduler operators could
be written to
>> manage
>> >> >> the
>> >> >> > > > > >workflows, i.e DAG of DAGs. using these
APIs.
>> >> >> > > > > >
>> >> >> > > > > >Regards,
>> >> >> > > > > >-Tushar.
>> >> >> > > > >
>> >> >> > > > > ________________________________________________________
>> >> >> > > > >
>> >> >> > > > > The information contained in this e-mail is
confidential
>> and/or
>> >> >> > > > > proprietary to Capital One and/or its affiliates
and may
>> only be
>> >> >> used
>> >> >> > > > > solely in performance of work or services for
Capital One.
>> The
>> >> >> > > > information
>> >> >> > > > > transmitted herewith is intended only for use
by the
>> individual
>> >> >>or
>> >> >> > > entity
>> >> >> > > > > to which it is addressed. If the reader of
this message is
>> not
>> >> >>the
>> >> >> > > > intended
>> >> >> > > > > recipient, you are hereby notified that any
review,
>> >> >>retransmission,
>> >> >> > > > > dissemination, distribution, copying or other
use of, or
>> taking
>> >> >>of
>> >> >> > any
>> >> >> > > > > action in reliance upon this information is
strictly
>> >> >>prohibited. If
>> >> >> > you
>> >> >> > > > > have received this communication in error,
please contact the
>> >> >> sender
>> >> >> > > and
>> >> >> > > > > delete the material from your computer.
>> >> >> > > > >
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> >> ________________________________________________________
>> >>
>> >> The information contained in this e-mail is confidential and/or
>> >> proprietary to Capital One and/or its affiliates and may only be used
>> >> solely in performance of work or services for Capital One. The
>> information
>> >> transmitted herewith is intended only for use by the individual or
>> entity
>> >> to which it is addressed. If the reader of this message is not the
>> intended
>> >> recipient, you are hereby notified that any review, retransmission,
>> >> dissemination, distribution, copying or other use of, or taking of any
>> >> action in reliance upon this information is strictly prohibited. If you
>> >> have received this communication in error, please contact the sender and
>> >> delete the material from your computer.
>> >>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>

Mime
View raw message