airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Becket Qin <becket....@gmail.com>
Subject Re: [AIP-35] Add Signal Based Scheduling To Airflow
Date Fri, 19 Jun 2020 15:13:02 GMT
Hi Gerard and Dan,

Thanks for the comments and suggestions. They are very helpful.

Please see the replies below:

*Re: Gerard*

> • I agree on calling it event based scheduling, may be less misleading.

Event-based scheduling it is. I'll change the term from now on.

In my opinion, I think the best way would be to extend BaseOperator to
> incorporate signals as suggested before. Adding a new operator seems it
> will make things more complex for the end user.

Sure, either way is fine with me. The introduction of SignalOperator was
from the assumption that we will deprecate BaseOperator after
completely switching to event-based scheduling. Thinking about this a bit
more, I am unsure whether we need the BaseOperator to inherit
SignalOperator. It might be better to have something like a
BaseOperatorWrapper which is a SignalOperator that wraps a BaseOperator.

For the existing operators that inherit BaseOperator, BaseOperatorWrapper
has the on_event() method implemented to mimic the same behavior as
dependency based scheduling. So the existing operators will just work
without any change. For new operator implementations, if the authors choose
to not use events at all, they may just implement the BaseOperator.
Otherwise, they can implement SignalOperator. In either case, the actual
scheduling will just be event-based.

• We should allow user to configure if they want to use Event based
> scheduling (async/signal) or operator dependency scheduling
> (sync/scheduler). This would allow users to choose the mode they want to
> run their DAG, and potentially also change the execution UI for the DAG.

I think event-based scheduling supports operator dependency scheduling. It
is just a different implementation. And with that implementation, in
addition to operator dependency scheduling, we can trigger scheduling using
something other than the operator dependency.

• Still concerned on adding logic to issue signals in user code.

Can you elaborate a little bit on the concerns? Is it about exposing the
scheduling control to users without a pre-defined order? Or is it something
else?

• We should also consider how would we implement a use case where its not
> just a long running task, but just several tasks that run in different
> schedules and that need a common downstream tasks to be run after either of
> them finishes. (example: train a new model every time new examples are
> available or new embeddings are available). This is why I think a more high
> level design could benefit the proposal. Aka async DAG execution.

This is exactly one of the use cases we would like to support with
event-based scheduling. In the current proposal, this would be achieve as
following:

1. The training operator will have two EventKeys as its interested events:
"NewExample" and "NewEmbedding".
2. The scheduler retrieves the interested event keys of each operator in
the DAG and keeps an EventKey -> InterestedOperatorList mapping. In this
case, the training operator will appear in both operator lists of
"NewExample" and "NewEmbedding".
3. When one of those two events comes, the scheduler checks the EventKey
and calls Operator#on_events(Event) method on each of the operators in
InterestedOperatorList to get an action that needs to be taken on that
operator. In this case, the operator in question would return START when it
sees either "NewExample" or "NewEmbedding".

• UI is going to be affected by the described clear task instance you
> describe. As a user I will only be able to see the latest task instance and
> that’s no good. Web UI will need to be changed to accommodate this and we
> will need to find a way to have multiple task instances of a single task_id
> + lineage visualization for understanding what event/task instance
> generated what execution of a task instance.

Yes, you are right. UI also needs to change so it will show the entire
execution history of an operator of a task in this case. That should
probably also be included in the AIP wiki.


*Re: Dan*
The Google doc we were referring to is here.
https://docs.google.com/document/d/1Wps5iaGONFdDYgQIR4uTQ9Q4fY6hWatS1crsuLmacys/edit?ts=5ee277bc#heading=h.q7bq5z8z3jqs

1. Regarding the scope of the PR.

Supporting streaming jobs is actually an important motivation of the
proposal. It is true that airflow was not designed to handle long running
jobs. In fact, we looked around and did not find any workflow tooling that
supports streaming jobs. And that is one of the reasons that we created
this AIP.

Although this AIP seemingly moves AirFlow a little towards service
orchestration by adding support for the long running jobs, there is still a
quite clear boundary between AirFlow and service orchestration. A service
orchestration does much more than a workflow, it usually has to deal with
stuff like resource management, environment setup, auto scaling and cross
workflow coordinations, etc. This AIP only proposes another mechanism to
deploy a single workflow. It does not even introduce semantics for cross
workflow communication. That being said, it is true that with this change,
it would be easier to integrate AirFlow into a service orchestration tool.

2. About the interaction between signals/events and existing AirFlow
abstractions.

> 2. Have we thought through the interactions between signals and existing
> Airflow abstractions especially with respect to edge cases, e.g. handling
> of missed signals, duplicated signals, zombie signals that no longer exist,
> interactions of STOP and RESTART with task retries, interactions of signal
> actions (e.g. start task) and scheduler actions (e.g. start task due to
> cron schedule), how signals will work with priority mechanisms like task
> priorities/non_pooled_task_slot_count etc, how each signal will interact
> with each task state (e.g. user marks tasks failedi in the UI), or will
> that happen in a subsequent doc/pull request?
>


In particular I'm curious about the interactions between scheduler polling
> logic that results in task state changes and the new signals system as I'm
> concerned that having both systems will add complexity and we should tryto
> standardize on one (e.g. instead of a task getting retried by the scheduler
> if it runs a polling loop and sees that the task received a failed signal
> OR the task getting retried by the scheduler receiving a restart task,
> instead we remove the polling logic and replace it with a signal handler
> for tasks that send a FAILED signal). I'm also curious how the REST API
> plays into this since it feels like there is some overlap with that as
> well.


These are great questions. I don't think we have thought all of them
through at this point. But I'll just put some quick ideas here. Please
forgive me if these ideas are too far away from what AirFlow looks like
right now.

   - handling the missing / duplicate events.
      - On the event sender side, an ack would be necessary to ensure an
      event is successfully sent, otherwise a retry can be issued. This might
      result in duplicates, but is solvable by deduplication.
      - On the event receiver side, given that we are using a pull model,
      it is unlikely to miss an event.
   - I am not sure what "zombie events" mean. Can you elaborate? I think if
   an event has been sent to the notification service, that event has
   happened, regardless of what happened afterwards.
   - Interaction with task retries.
      - Are the retries defined on each operator? If so, I imagine with
      SignalBasedOperator, the retries will be implemented in the
      SignalOperator#on_event() method. When operator.on_event(TaskFailedEvent)
      is invoked, the operator simply returns START, and that is essentially a
      retry.
   - Priority / non_pooled_task_slot_count
      - Would the following basic pattern work with them?

The basic pattern in the runtime here is following:

   - *Events* are just facts that passed to the operators.
   - AirFlow provides some A*ctions* the operators can ask AirFlow to take,
   e.g. task start / stop / restart.
   - AirFlow passes the events to the operators, and lets the operators
   decide what actions to take.A
   - At API level, AirFlow has some predefined events -> action mappings
   for the operators. e.g. in case of task dependency based scheduling,
   on_event(task_finished) of an operator may be overridden to START, because
   the action is already defined when users construct the DAG with the API.
   - The REST API requests can be considered as another event and also
   handled by the scheduler.


Thanks again for the great feedback and questions.

Cheers,

Jiangjie (Becket) Qin


On Fri, Jun 19, 2020 at 6:08 AM Dan Davydov <ddavydov@twitter.com.invalid>
wrote:

> Metacomment:
> You might want to consider moving this discussion to a google doc or
> something since it seems like a lot of divergent threads are being created
> due to the scope of this change, which can be a bit hard to keep track of
> in email. If folks are concerned about history we can dump the google doc
> contents/comments into the AIP at the end. I see that you mentioned a
> google doc but I didn't see a link so I'm assuming it was some private one.
>
>
> At a high level I think this is a really great idea, and will unlock the
> ability to remove a lot of the bottlenecks of Airflow that are based around
> the current polling-based task scheduling, and also prevent the need for
> sensor operators in a lot of cases.
>
> 1.
>
> >  I'm in general supportive of this idea of supporting streaming jobs.
>
> It's my understanding that the scope of this PR is not to support streaming
> tasks in Airflow (Nicholas if I'm incorrect let me know since this is an
> important decision). Airflow isn't really designed to work well with
> streaming, typically you use an http operator or something like that to
> send signal requests etc. Otherwise AIrflow would need to become kind of a
> service orchestrator which I think would be outside of the scope of what
> Airflow is designed to do and would be hard to implement in a clean way
> using Airflow's current primitives like tasks/dagruns/xcom, though it's
> possible I am wrong. I haven't been at Airbnb for a while now, so take this
> with a grain of salt, but the usage of Airflow as a service orchestrator
> IIRC was more of a hack to work around the fact that there was no easy way
> to easily launch lightweight python services in Kubernetes, so Airflow
> tasks were used instead.
>
> 2. Have we thought through the interactions between signals and existing
> Airflow abstractions especially with respect to edge cases, e.g. handling
> of missed signals, duplicated signals, zombie signals that no longer exist,
> interactions of STOP and RESTART with task retries, interactions of signal
> actions (e.g. start task) and scheduler actions (e.g. start task due to
> cron schedule), how signals will work with priority mechanisms like task
> priorities/non_pooled_task_slot_count etc, how each signal will interact
> with each task state (e.g. user marks tasks failedi in the UI), or will
> that happen in a subsequent doc/pull request?
>
> In particular I'm curious about the interactions between scheduler polling
> logic that results in task state changes and the new signals system as I'm
> concerned that having both systems will add complexity and we should tryto
> standardize on one (e.g. instead of a task getting retried by the scheduler
> if it runs a polling loop and sees that the task received a failed signal
> OR the task getting retried by the scheduler receiving a restart task,
> instead we remove the polling logic and replace it with a signal handler
> for tasks that send a FAILED signal). I'm also curious how the REST API
> plays into this since it feels like there is some overlap with that as
> well.
>
> Another example of something that should probably be moved to the signal
> system is this part of the AIP:
>
> >   3. Regarding this section in the AIP:
> >     Step 3. Each available worker pulls a TaskInstance from the queue and
> > executes it, sends a signal to  the notification service a signal
> > indicating the task state changes from “queued” to “running”. The
> > scheduling loop will then update the task state in the database when
> > receiving this signal..
> >   Shouldn't the DB update logic happen in the schedule signal listener
> > instead of the poll-based scheduling loop?
>
>
>
> On Thu, Jun 18, 2020 at 3:00 PM Gerard Casas Saez
> <gcasassaez@twitter.com.invalid> wrote:
>
> > Re-read this a couple times.  Some thoughts:
> >
> > • I agree on calling it event based scheduling, may be less misleading.
> > • I believe we can mostly all agree that this is something interesting to
> > add to Airflow and a use case that would be good to support. Nailing the
> > details and path to implement is important though. (or at least that’s my
> > understanding)
> > • In my opinion, I think the best way would be to extend BaseOperator to
> > incorporate signals as suggested before. Adding a new operator seems it
> > will make things more complex for the end user.
> >     • We should allow user to configure if they want to use Event based
> > scheduling (async/signal) or operator dependency scheduling
> > (sync/scheduler). This would allow users to choose the mode they want to
> > run their DAG, and potentially also change the execution UI for the DAG.
> > • Still concerned on adding logic to issue signals in user code.
> >     • An alternative option here is to use XCom/data lineage artifacts
> for
> > signal backend. Aka when an operator A pushes a new value of XCom keyed
> K,
> > execute operator B with that XCom value. Most of the use cases I can
> think
> > of for even based scheduling can adapt to this concept where downstream
> > operators need to run whenever a new data artifact is generate by an
> > upstream artifact.
> >     • Issue here is we don't have good data artifact representation on
> > Airflow as they usually tend to be implicit (over explicit). Hopefully w
> > Functional DAGs we can improve this.
> > • We should also consider how would we implement a use case where its not
> > just a long running task, but just several tasks that run in different
> > schedules and that need a common downstream tasks to be run after either
> of
> > them finishes. (example: train a new model every time new examples are
> > available or new embeddings are available). This is why I think a more
> high
> > level design could benefit the proposal. Aka async DAG execution.
> > • UI is going to be affected by the described clear task instance you
> > describe. As a user I will only be able to see the latest task instance
> and
> > that’s no good. Web UI will need to be changed to accommodate this and we
> > will need to find a way to have multiple task instances of a single
> task_id
> > + lineage visualization for understanding what event/task instance
> > generated what execution of a task instance.
> >
> >
> > Gerard Casas Saez
> > Twitter | Cortex | @casassaez
> > On Jun 17, 2020, 10:22 AM -0600, Becket Qin <becket.qin@gmail.com>,
> wrote:
> > > Hi Kevin and Ash,
> > >
> > > Thanks for the feedback / comments / suggestions. It is great to know
> > that
> > > Airbnb has already been running stream jobs using AirFlow and there
> might
> > > be a working solution.
> > >
> > > First of all, I'd like to say that we are still quite new to the
> AirFlow,
> > > our proposal likely has overlooked many things and that's why we would
> > like
> > > to get advices from the community. So we really appreciate suggestions
> > and
> > > feedback.
> > >
> > > I can't agree more that we should introduce mechanisms to AirFlow
> instead
> > > of just making it support some specific use cases. The two scenarios
> > > mentioned by Nicholas above are just some examples to explain how
> > > signal-based scheduling enables new possibility. It does not suggest
> that
> > > our goal is just to make those two cases work.
> > >
> > > Just to add some background to the proposal, the original motivation of
> > > this AIP actually came from our project in Apache Flink community
> called
> > AI
> > > Flow[1][2]. It aims to define a clear API for people to describe their
> > > machine learning workflows that contain both stream and batch jobs. We
> > have
> > > a basic implementation of signal-based scheduling but would like to
> > switch
> > > to AirFlow so we don't re-invent wheels.
> > >
> > > In any case, it is probably useful to clarify a little bit on the
> > rational
> > > behind the proposal. The current AIP-35 proposes two major changes:
> > >
> > > *Signal based-scheduling*
> > > The key idea is to expand the conditions to trigger a scheduling
> action,
> > > from job status changes to a more extensible concept of *signal* or
> > *event.
> > > *This concept extension brings two benefits:
> > >
> > > 1. Make AirFlow capable of dealing with richer types of workflows
> > > including long running jobs more smoothly.
> > > Although the two cases mentioned above may be supported with AirFlow as
> > > is, the solutions seem a workaround and are not something designed as
> the
> > > first class citizen. It essentially relies on the user code to put
> > together
> > > some disjoint workflow submissions to form a complete business logic.
> > > Ideally we would like to see the entire business logic as an integral
> > > workflow.
> > > 2. Allow downstream projects to integrate with AirFlow more easily.
> > > As an example, imagine an operator in a workflow which deploys a model
> > > to an online inference service. In some cases, such model deployment
> also
> > > requires external approvals which is not a part of the workflow. With a
> > > scheduler taking external signals, it becomes much easier to integrate
> > the
> > > workflow with internal approval systems.
> > >
> > > *Signal notification service*
> > > Signal notification service is also helpful in two aspects:
> > >
> > > 1. It helps isolate the signal sender and signal receivers.
> > > That means an operator who sends the signal does not need to be aware
> of
> > > who might be listening to this signal and what action have to be taken.
> > One
> > > signal may have multiple listeners and thus multiple workflow actions
> may
> > > be triggered.
> > > 2. It allows pluggability and again integration with existing systems.
> > > For example, some users may decide to have a Kafka based notification
> > > service.
> > >
> > > We do realize that this proposal is a significant change to AirFlow and
> > > expect taking some time to sort everything out. I am wondering if doing
> > the
> > > following in order would help the discussion.
> > >
> > > 1. First reach consensus on whether this is worth doing or not.
> > > 2. If we think it is worth doing, what would be the ideal final state.
> > > 3. Come up with the implementation plan and migration path from the
> > current
> > > state to the ideal final state.
> > >
> > > What do you think?
> > >
> > > Re: Ash
> > > About some of the questions and suggestions.
> > >
> > > And I'm also not a fan of the name -- perhaps just my background and
> > > > dealing with the scheduler and executor code -- but to me a signal
> is a
> > > > Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was
> > described
> > > > as "Event-driven Scheduling".
> > >
> > >
> > > I don't have a strong opinion on the naming. We picked "signal" instead
> > of
> > > "event" simply because that sounds more relevant to "control". But I
> > think
> > > "event" is also a suitable term and it is also intuitive enough.
> > >
> > > In terms of your proposed class layout:
> > > > class BaseOperator(SignalOperator, LoggingMixin):
> > > > Don't create the extra class, just put new things on BaseOperator.
> > >
> > >
> > > Tao raised a similar question in the google doc. I agree that it would
> > work
> > > by putting new things on the BaseOperator. I think one reason of
> having a
> > > separate SignalOperator class is that theoretically speaking, the
> > scheduler
> > > only needs to deal with the SignalOperator interface. If so, we may
> want
> > to
> > > eventually migrate from the current BaseOperator to SignalOperator. The
> > job
> > > status change will just become one type of the signals. So the new
> > > operators might be able to just implement a SignalOperator instead of
> > > BaseOperator. Having a separate SignalOperator would make it easier to
> > > eventually converge to SignalOperator and deprecate the current
> > > BaseOperator.
> > >
> > > Doesn't this require custom code inside the streaming job to report
> > > > events back to Airflow?
> > >
> > >
> > > For the job status change events / signals, users don't have to do
> > > anything. It is just currently the job status are communicated back the
> > > scheduler via Database, now it is going to be via the signal
> notification
> > > service.
> > > For the signals that is not job status related, user code needs to
> send a
> > > signal / event at some point.
> > >
> > > 3. How do we notice if that long running task dies without a trace --
> > > > i.e. if it crashes hard/the Kuberenetes node it is on dies, etc and
> > > > never signals that it errored how does Airflow know?
> > >
> > >
> > > In our basic implementation, we have a job status listener component
> that
> > > can work with K8S / Yarn / Other cloud platforms, to keep track of the
> > job
> > > status change. And the job status change signals are actually sent by
> it.
> > > Does AirFlow right now also listen to the status of the jobs running in
> > > K8S? Can you help us understand a bit more on the current failure
> > handling
> > > model of AirFlow?
> > >
> > > 4. You're new key component of th Signal Notification Service is not
> > > > clearly defined.
> > > > Where does it store things?
> > > > Does it need persistent storage, or is in-memory good enough?
> > > > Does it need at-most-once delivery semantics? At-least-once delivery?
> > > > Does the scheduler pull from it, or does signal service push to the
> > > > scheduler.
> > > > What happens when there are more than one scheduler running (AIP-15,
> > > > which I am working on right now)
> > > > How do we run more than one notification service component (i.e. HA
> --
> > > > once the scheduler is HA we can't have any other new components
> being a
> > > > single point of failure)
> > > > Do signals get delivered to just one scheduler, or all schedulers?
> > >
> > >
> > > Sorry about the confusion. It is true that a bunch of details are
> missed
> > > here. We will add them into the wiki. To answer the questions quickly:
> > >
> > > - In our basic implementation, default signal notification service
> > > stores the signal in my sql.
> > > - The notification service has a REST API works in a long-pull manner,
> > > i.e. a client sends an http request with the last version of the keys
> it
> > > has seen, and that request parking waits on the server side until one
> of
> > > the following is met:
> > > - Request timeout is reached, in which case an empty response is
> > > returned.
> > > - A new version of the interested signal is available. The
> > > notification service returns all the SignalVersions newer than
> > > the version
> > > last seen by the client.
> > > - The scheduler pulls from the notification service, but in a long-pull
> > > manner.
> > > - Off the top of my head, the notification service probably just needs
> > > one instance which supports multiple schedulers. It could have more
> > > instances for scalability.
> > > - Technically speaking the notification service frontend itself is
> > > stateless. So failure recovery is simple. So if the database of the
> > > notification service has HA. It should be sufficient.
> > > - Signals will be delivered to all the schedulers that are listening to
> > > that signal.
> > >
> > > 5. Does this now mean to get started out with Airflow you need to run
> > > > three components: webserver, scheduler and signal service?
> > >
> > >
> > > Not sure if there is a problem, but I imagine for a local deployment, a
> > > signal service could be a part of the webserver, just with one more
> REST
> > > endpoint.
> > >
> > > Thanks again for all the great feedback and looking forward to your
> > > suggestions.
> > >
> > > Cheers,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > [1] https://www.youtube.com/watch?v=xiYJTCj2zUU
> > > [2]
> > https://www.slideshare.net/JiangjieQin/ai-made-easy-with-flink-ai-flow
> > >
> > >
> > > On Wed, Jun 17, 2020 at 6:16 PM Ash Berlin-Taylor <ash@apache.org>
> > wrote:
> > >
> > > > I also agree with Kevin - I'd love to see better streaming support in
> > > > Airflow, but I'm not sure this is the way to go about it. Something
> > > > about it feels not quite right.
> > > >
> > > > And I'm also not a fan of the name -- perhaps just my background and
> > > > dealing with the scheduler and executor code -- but to me a signal
> is a
> > > > Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was
> > described
> > > > as "Event-driven Scheduling".
> > > >
> > > >
> > > > To take your example:
> > > >
> > > > > For example, in machine learning, an online
> > > > > learning job is a streaming job running forever. It emits a model
> > > > > periodically and a batch evaluate job should run to validate each
> > model
> > > >
> > > > This is possible right now in Airflow by making an API request to
> > > > trigger a run of the validation DAG.
> > > >
> > > > In terms of your proposed class layout:
> > > >
> > > > class BaseOperator(SignalOperator, LoggingMixin):
> > > >
> > > > Don't create the extra class, just put new things on BaseOperator.
> > > >
> > > > As Chris said:
> > > >
> > > > > Are you saying that you actually have tasks in Airflow that are
> > > > > intended to run indefinitely?
> > > >
> > > > 1. How is Airflow meant to handle that task? Right now it runs it
> _and
> > > > waits for completion_. It's not clear you've actually thought through
> > > > everything this change would involve at the code level --
> specifically
> > > > in the Executors.
> > > >
> > > > 2. Doesn't this require custom code inside the streaming job to
> report
> > > > events back to Airflow?
> > > >
> > > > I am very uneasy about _needing_ to make changes in the "task code"
> to
> > > > support running it under Airflow.
> > > >
> > > > One of the big plus points to me is that right now with Airflow can
> > > > run whatever tasks or jobs you like, without your job code needing to
> > > > know or care about Airflow. This approach changes that. And this is
> not
> > > > a decision to be made lightly.
> > > >
> > > > 3. How do we notice if that long running task dies without a trace --
> > > > i.e. if it crashes hard/the Kuberenetes node it is on dies, etc and
> > > > never signals that it errored how does Airflow know?
> > > >
> > > > 4. You're new key component of th Signal Notification Service is not
> > > > clearly defined.
> > > >
> > > > Where does it store things?
> > > >
> > > > Does it need persistent storage, or is in-memory good enough?
> > > >
> > > > Does it need at-most-once delivery semantics? At-least-once delivery?
> > > >
> > > > Does the scheduler pull from it, or does signal service push to the
> > > > scheduler.
> > > >
> > > > What happens when there are more than one scheduler running (AIP-15,
> > > > which I am working on right now)
> > > >
> > > > How do we run more than one notification service component (i.e. HA
> --
> > > > once the scheduler is HA we can't have any other new components
> being a
> > > > single point of failure)
> > > >
> > > > Do signals get delivered to just one scheduler, or all schedulers?
> > > >
> > > > 5. Does this now mean to get started out with Airflow you need to run
> > > > three components: webserver, scheduler and signal service?
> > > >
> > > > (Although the performance on a laptop doesn't matter _too_ much I am
> > > > concerned that we keep the "getting started" experience as easy as
> > > > possible for new users)
> > > >
> > > > In short I'd like to see _a lot_ more detail on this proposal. It's
> > > > proposing a fundamental change to Airflow and there are still lots of
> > > > things not covered in enough detail.
> > > >
> > > > I'm also not sure it's even needed for your example workflow. For
> > > > instance one way of dealing with it right now would be:
> > > >
> > > > - A task to start a streaming job. This is essentially
> fire-and-forget.
> > > > - A dag with schedule_interval=None that is "manually" invoked (by
> API
> > > > request from within your streaming code) with these two tasks:
> > > >
> > > > model build task >> model validation task >> streaming "restart"
task
> > > >
> > > > i.e. this manually triggerd dag does the job of your signals proposal
> > > > if I've understood your example correctly. I'm not saying this is a
> > > > _good_ way of doing it, just A way. Have I correctly understood your
> > > > example?
> > > >
> > > >
> > > > -ash
> > > >
> > > >
> > > >
> > > > On Jun 17 2020, at 10:21 am, 蒋晓峰 <thanosxnicholas@gmail.com>
wrote:
> > > >
> > > > > Hi Gerard,
> > > > >
> > > > > If users follow the definition of SignalOperator correctly, the
> idea
> > for
> > > > > the streaming-triggered-batch case is to restart the execution for
> > > > > evaluations of the online trained model. In other words, once the
> > > > > evaluation operator receives the signal from the online learning
> > > > operator,
> > > > > the scheduler takes RESTART action to restart the task of the
> > > > > evaluation on
> > > > > the indefinitely running DAGRun. Upon a new signal is received, the
> > > > > scheduler checks with the SignalOperators to determine the action
> of
> > > > > Operator and carries out that action. Then, this scheduling also
> > updates
> > > > > the state of the task instance, not clear the operator's state in
> the
> > > > > metadata database. At this time, the scheduler cuts out the
> subgraph
> > from
> > > > > the DAG and constructs the sub DAG to run.
> > > > >
> > > > > While keeping a history of operator execution, the scheduling could
> > use
> > > > > signals and execution history to organize signal conditions of the
> > > > > operators. Once the operator of the evaluation receives a new
> > signal, the
> > > > > scheduler checks whether the signal and previous executions are met
> > with
> > > > > the conditions. If the received signal is met with the
> corresponding
> > > > > condition and the execution of task instances also meet the
> > > > > conditions, the
> > > > > scheduler would take the RESTART action repeatedly for model
> > evaluation.
> > > > >
> > > > > For supporting signal based triggers, the signal-based scheduling
> > > > requires
> > > > > the introduction of SignalOperator for the operators that send
> > signals.
> > > > > With the SignalOperator, the scheduler could switch to use signals
> > of the
> > > > > operator and the corresponding condition to decide the action of
> the
> > > > > operator. No matter whether the relationship between upstream tasks
> > to
> > > > > downstream tasks is one-to-one or one-to-many, this scheduling only
> > > > checks
> > > > > whether the received signals of the SignalOperator is met with the
> > signal
> > > > > conditions and concerns about which action to take when these
> > conditions
> > > > > are met.
> > > > >
> > > > > By the way, signal-based scheduling needs a few more things to map
> > > > signals
> > > > > to corresponding actions.
> > > > >
> > > > > Regards,
> > > > > Nicholas Jiang
> > > > >
> > > > > On Wed, Jun 17, 2020 at 12:00 AM Gerard Casas Saez
> > > > > <gcasassaez@twitter.com.invalid> wrote:
> > > > >
> > > > > > That looks interesting. My main worry is how to handle multiple
> > > > executions
> > > > > > of Signal based operators. If I follow your definition correctly,
> > the
> > > > idea
> > > > > > is to run multiple evaluations of the online trained model (on
a
> > > > > > permanently running DAGRun). So what happens when you have
> > triggered the
> > > > > > downstream operators using a signal once? Do you clear state
of
> the
> > > > > > operators once a new signal is generated? Do you generate a
new
> > DAGRun?
> > > > > >
> > > > > > How do you evaluate the model several times while keeping a
> > history of
> > > > > > operator execution?
> > > > > >
> > > > > > Related to this, TFX has a similar proposal for ASYNC DAGs,
which
> > > > > > basically describe something similar to what you are proposing
in
> > > > > > this AIP:
> > > > > > https://github.com/tensorflow/community/pull/253 (interesting
> > read as
> > > > > > it’s also related to the ML field).
> > > > > >
> > > > > > My main concern would be that you may need to change a few more
> > > > > > things to
> > > > > > support signal based triggers, as the relationship between
> upstream
> > > > tasks
> > > > > > to downstream tasks is no longer 1:1 but 1:many.
> > > > > >
> > > > > > Gerard Casas Saez
> > > > > > Twitter | Cortex | @casassaez
> > > > > > On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicholas@gmail.com>,
> > wrote:
> > > > > > > Hello everyone,
> > > > > > >
> > > > > > > Sending a message to everyone and collecting feedback on
the
> > AIP-35 on
> > > > > > > adding signal-based scheduling. This was previously briefly
> > > > > > mentioned in
> > > > > > > the discussion of development slack channel. The key motivation
> > of
> > > > this
> > > > > > > proposal is to support a mixture of batch and stream jobs
in
> the
> > same
> > > > > > > workflow.
> > > > > > >
> > > > > > > In practice, there are many business logics that need
> > collaboration
> > > > > > between
> > > > > > > streaming and batch jobs. For example, in machine learning,
an
> > online
> > > > > > > learning job is a streaming job running forever. It emits
a
> model
> > > > > > > periodically and a batch evaluate job should run to validate
> each
> > > > model.
> > > > > > So
> > > > > > > this is a streaming-triggered-batch case. If we continue
from
> the
> > > > above
> > > > > > > example, once the model passes the validation, the model
needs
> > to be
> > > > > > > deployed to a serving job. That serving job could be a
> streaming
> > > > > > job that
> > > > > > > keeps polling records from Kafka and uses a model to do
> > prediction.
> > > > And
> > > > > > the
> > > > > > > availability of a new model requires that streaming prediction
> > job
> > > > either
> > > > > > > to restart, or to reload the new model on the fly. In either
> > case,
> > > > > > it is
> > > > > > a
> > > > > > > batch-to-streaming job triggering.
> > > > > > >
> > > > > > > At this point above, people basically have to invent their
own
> > way to
> > > > > > deal
> > > > > > > with such workflows consisting of both batch and streaming
> jobs.
> > I
> > > > think
> > > > > > > having a system that can help smoothly work with a mixture
of
> > > > streaming
> > > > > > and
> > > > > > > batch jobs is valuable here.
> > > > > > >
> > > > > > > This AIP-35 focuses on signal-based scheduling to let the
> > operators
> > > > send
> > > > > > > signals to the scheduler to trigger a scheduling action,
such
> as
> > > > starting
> > > > > > > jobs, stopping jobs and restarting jobs. With the change
of
> > > > > > > scheduling mechanism, a streaming job can send signals
to the
> > > > scheduler
> > > > > > to
> > > > > > > indicate the state change of the dependencies.
> > > > > > >
> > > > > > > Signal-based scheduling allows the scheduler to know the
change
> > of the
> > > > > > > dependency state immediately without periodically querying
the
> > > > metadata
> > > > > > > database. This also allows potential support for richer
> > scheduling
> > > > > > > semantics such as periodic execution and manual trigger
at per
> > > > operator
> > > > > > > granularity.
> > > > > > >
> > > > > > > Changes proposed:
> > > > > > >
> > > > > > > - Signals are used to define conditions that must be met
to run
> > an
> > > > > > > operator. State change of the upstream tasks is one type
of the
> > > > signals.
> > > > > > > There may be other types of signals. The scheduler may
take
> > different
> > > > > > > actions when receiving different signals. To let the operators
> > take
> > > > > > signals
> > > > > > > as their starting condition, we propose to introduce
> > SignalOperator
> > > > which
> > > > > > > is mentioned in the public interface section.
> > > > > > > - A notification service is necessary to help receive and
> > propagate
> > > > the
> > > > > > > signals from the operators and other sources to the scheduler.
> > Upon
> > > > > > > receiving a signal, the scheduler can take action according
to
> > the
> > > > > > > predefined signal-based conditions on the operators. Therefore
> we
> > > > propose
> > > > > > > to introduce a Signal Notification Service component to
> Airflow.
> > > > > > >
> > > > > > > Please see related documents and PRs for details:
> > > > > > >
> > > > > > > AIP:
> > > > > > >
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow
> > > > > > > Issue: https://github.com/apache/airflow/issues/9321
> > > > > > >
> > > > > > > Please let me know if there are any aspects that you
> > agree/disagree
> > > > with
> > > > > > or
> > > > > > > need more clarification (especially the SignalOperator
and
> > > > > > SignalService).
> > > > > > > Any comments are welcome and I am looking forward to it!
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Nicholas
> > > > > >
> > > > >
> > > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message